Skip to main content

openmp_reducer/
lib.rs

1/*
2 * SPDX-FileCopyrightText: 2025 Tommaso Fontana
3 * SPDX-FileCopyrightText: 2025 Sebastiano Vigna
4 *
5 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6 */
7
8#![doc = include_str!("../README.md")]
9
10use std::fmt::{self, Debug};
11use std::marker::PhantomData;
12use std::sync::Mutex;
13
14/// An OpenMP-style reducer that wraps a global value into a [`Mutex`],
15/// providing [shareable, cloneable copies with a local value](#method.share);
16/// the copies will be reduced into the global value when dropped.
17///
18/// The global value can be observed with [`peek`](Reducer::peek) if the base
19/// type is [`Clone`], whereas [`get`](Reducer::get) consumes self and returns
20/// the global value.
21///
22/// For convenience, the global value and the local value have distinct type
23/// parameters `G` and `L`, respectively; the second type defaults to the first
24/// one. The reduction function has type parameter `F`, defaulting to
25/// `fn(&mut G, &L)`.
26///
27/// Each shared copy has a reference to the [`Reducer`] it was created from, so
28/// you cannot call [`get`](Reducer::get) if there are still shared copies
29/// around. For example, this code will not compile:
30/// ```compile_fail
31/// use openmp_reducer::Reducer;
32///
33/// let reducer = Reducer::<usize>::new(3, |global, local| *global += *local);
34/// let mut shared = reducer.share();
35/// // drop(shared); // uncommenting this line would make the code compile
36/// assert_eq!(reducer.get(), 3);
37///```
38///
39/// # Thread Safety
40///
41/// [`Reducer`] is [`Send`] and [`Sync`] when `G: Send` and `F: Send + Sync`.
42/// [`SharedReducer`] is [`Send`] when additionally `L: Send`.
43///
44/// # Examples
45///
46/// In this example, we manually spawn processes:
47///
48/// ```rust
49/// use openmp_reducer::Reducer;
50/// use std::thread;
51///
52/// let reducer = Reducer::<usize>::new(5, |global, local| *global += *local);
53/// std::thread::scope(|s| {
54///     for i in 0..3 {
55///         let mut shared = reducer.share();
56///         s.spawn(move || {
57///             *shared.as_mut() += 10;
58///         });
59///     }
60/// });
61///
62/// // Initial value plus additional values from shared copies
63/// assert_eq!(reducer.get(), 35);
64/// ```
65///
66/// You can obtain the same behavior with [Rayon](https://docs.rs/rayon) using
67/// methods such as
68/// [`for_each_with`](https://docs.rs/rayon/latest/rayon/iter/trait.ParallelIterator.html#method.for_each_with)
69/// and
70/// [`map_with`](https://docs.rs/rayon/latest/rayon/iter/trait.ParallelIterator.html#method.map_with):
71///
72/// ```rust
73/// use openmp_reducer::Reducer;
74/// use rayon::prelude::*;
75///
76/// let reducer = Reducer::<usize>::new(5, |global, local| *global += *local);
77/// (0..1000000).into_par_iter().
78///     with_min_len(1000). // optional, might reduce the amount of cloning
79///     for_each_with(reducer.share(), |shared, i| {
80///         *shared.as_mut() += 1;
81///     }
82/// );
83///
84/// // Initial value plus additional values from clones
85/// assert_eq!(reducer.get(), 1_000_005);
86/// ```
87///
88/// Note that you have to pass `reducer.share()`, which can be cloned. Also,
89/// since
90/// [`for_each_with`](https://docs.rs/rayon/latest/rayon/iter/trait.ParallelIterator.html#method.for_each_with)
91/// might perform excessive cloning if jobs are too short, you can use
92/// [`with_min_len`](https://docs.rs/rayon/latest/rayon/iter/trait.ParallelIterator.html#method.with_min_len)
93/// to reduce the amount of cloning.
94pub struct Reducer<G, L = G, F = fn(&mut G, &L)> {
95    global: Mutex<G>,
96    reduce: F,
97    _marker: PhantomData<fn(L)>,
98}
99
100impl<G: Debug, L, F> Debug for Reducer<G, L, F> {
101    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102        f.debug_struct("Reducer")
103            .field("global", &self.global)
104            .finish_non_exhaustive()
105    }
106}
107
108impl<G, L, F: Fn(&mut G, &L)> Reducer<G, L, F> {
109    /// Creates a new reducer with a given reduction function.
110    ///
111    /// The function must reduce the local value (second argument) into the
112    /// global value (first argument). For the result to be deterministic, the
113    /// global value must be the same regardless of the order in which the local
114    /// values are reduced.
115    pub fn new(init: G, reduce: F) -> Self {
116        Reducer {
117            global: Mutex::new(init),
118            reduce,
119            _marker: PhantomData,
120        }
121    }
122}
123
124impl<G, L, F> Reducer<G, L, F> {
125    /// Consumes self and returns the global value.
126    ///
127    /// Note that you cannot call this method if there are still [shared
128    /// copies](#method.share) that have not been dropped.
129    ///
130    /// If you just need to access the global value without consuming self, and
131    /// the base type is [`Clone`], use [`peek`](Reducer::peek).
132    ///
133    /// # Panics
134    ///
135    /// This method will panic if the mutex is poisoned.
136    pub fn get(self) -> G {
137        self.global.into_inner().unwrap()
138    }
139}
140
141impl<G, L: Default, F: Fn(&mut G, &L)> Reducer<G, L, F> {
142    /// Returns a [`SharedReducer`] referencing this [`Reducer`].
143    ///
144    /// The [`SharedReducer`] will be initialized with the default value of the
145    /// local type.
146    pub fn share(&self) -> SharedReducer<'_, G, L, F> {
147        SharedReducer {
148            openmp_reducer: self,
149            local: L::default(),
150        }
151    }
152}
153
154impl<G: Clone, L, F> Reducer<G, L, F> {
155    /// Returns the current global value.
156    ///
157    /// Note that this method does not guarantee that all shared copies have
158    /// been dropped. If you need that guarantee, use [`get`](Reducer::get).
159    ///
160    /// # Panics
161    ///
162    /// This method will panic if the mutex is poisoned.
163    pub fn peek(&self) -> G {
164        self.global.lock().unwrap().clone()
165    }
166}
167
168/// A shareable copy of a [`Reducer`] containing a local value and implementing
169/// [`Clone`].
170///
171/// The local value can be accessed with [`AsRef`] and [`AsMut`]
172/// implementations.
173///
174/// When a [`SharedReducer`] is dropped, the local value will be reduced into
175/// the global value. If the mutex is poisoned (e.g., due to a panic in another
176/// thread), the reduction will still be performed on the recovered value.
177pub struct SharedReducer<'a, G, L = G, F: Fn(&mut G, &L) = fn(&mut G, &L)> {
178    openmp_reducer: &'a Reducer<G, L, F>,
179    local: L,
180}
181
182impl<G: Debug, L: Debug, F: Fn(&mut G, &L)> Debug for SharedReducer<'_, G, L, F> {
183    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
184        f.debug_struct("SharedReducer")
185            .field("openmp_reducer", &self.openmp_reducer)
186            .field("local", &self.local)
187            .finish()
188    }
189}
190
191impl<G, L: Default, F: Fn(&mut G, &L)> Clone for SharedReducer<'_, G, L, F> {
192    /// Returns a copy sharing the same global value and
193    /// with local value initialized to the default value.
194    fn clone(&self) -> Self {
195        SharedReducer {
196            openmp_reducer: self.openmp_reducer,
197            local: L::default(),
198        }
199    }
200}
201
202impl<G: Clone, L, F: Fn(&mut G, &L)> SharedReducer<'_, G, L, F> {
203    /// Returns the current global value.
204    ///
205    /// This method delegates to [`Reducer::peek`].
206    pub fn peek(&self) -> G {
207        self.openmp_reducer.peek()
208    }
209}
210
211impl<G, L, F: Fn(&mut G, &L)> Drop for SharedReducer<'_, G, L, F> {
212    /// Reduces the local value into the global value.
213    fn drop(&mut self) {
214        let mut lock = self.openmp_reducer.global.lock().unwrap_or_else(|e| e.into_inner());
215        (self.openmp_reducer.reduce)(&mut *lock, &self.local);
216    }
217}
218
219impl<G, L, F: Fn(&mut G, &L)> AsRef<L> for SharedReducer<'_, G, L, F> {
220    /// Returns a reference to the local value.
221    fn as_ref(&self) -> &L {
222        &self.local
223    }
224}
225
226impl<G, L, F: Fn(&mut G, &L)> AsMut<L> for SharedReducer<'_, G, L, F> {
227    /// Returns a mutable reference to the local value.
228    fn as_mut(&mut self) -> &mut L {
229        &mut self.local
230    }
231}