Skip to main content

snarkvm_utilities/
parallel.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkVM library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16pub struct ExecutionPool<'a, T> {
17    jobs: Vec<Box<dyn 'a + FnOnce() -> T + Send>>,
18}
19
20impl<'a, T> ExecutionPool<'a, T> {
21    pub fn new() -> Self {
22        Self { jobs: Vec::new() }
23    }
24
25    pub fn with_capacity(cap: usize) -> Self {
26        Self { jobs: Vec::with_capacity(cap) }
27    }
28
29    pub fn add_job<F: 'a + FnOnce() -> T + Send>(&mut self, f: F) {
30        self.jobs.push(Box::new(f));
31    }
32
33    pub fn execute_all(self) -> Vec<T>
34    where
35        T: Send + Sync,
36    {
37        #[cfg(not(feature = "serial"))]
38        {
39            use rayon::prelude::*;
40            execute_with_max_available_threads(|| self.jobs.into_par_iter().map(|f| f()).collect())
41        }
42        #[cfg(feature = "serial")]
43        {
44            self.jobs.into_iter().map(|f| f()).collect()
45        }
46    }
47}
48
49impl<T> Default for ExecutionPool<'_, T> {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55#[cfg(not(feature = "serial"))]
56pub fn max_available_threads() -> usize {
57    use aleo_std::Cpu;
58    let rayon_threads = rayon::current_num_threads();
59
60    match aleo_std::get_cpu() {
61        Cpu::Intel => num_cpus::get_physical().min(rayon_threads),
62        Cpu::AMD | Cpu::Unknown => rayon_threads,
63    }
64}
65
66#[inline(always)]
67#[cfg(not(any(feature = "serial", feature = "wasm")))]
68pub fn execute_with_max_available_threads<T: Sync + Send>(f: impl FnOnce() -> T + Send) -> T {
69    execute_with_threads(f, max_available_threads())
70}
71
72#[inline(always)]
73#[cfg(any(feature = "serial", feature = "wasm"))]
74pub fn execute_with_max_available_threads<T>(f: impl FnOnce() -> T + Send) -> T {
75    f()
76}
77
78#[cfg(not(any(feature = "serial", feature = "wasm")))]
79#[inline(always)]
80fn execute_with_threads<T: Sync + Send>(f: impl FnOnce() -> T + Send, num_threads: usize) -> T {
81    if rayon::current_thread_index().is_none() {
82        let pool = rayon::ThreadPoolBuilder::new().num_threads(num_threads).build().unwrap();
83        pool.install(f)
84    } else {
85        f()
86    }
87}
88
89/// Creates parallel iterator over refs if `parallel` feature is enabled.
90#[macro_export]
91macro_rules! cfg_iter {
92    ($e: expr) => {{
93        #[cfg(not(feature = "serial"))]
94        let result = $e.par_iter();
95
96        #[cfg(feature = "serial")]
97        let result = $e.iter();
98
99        result
100    }};
101
102    ($e: expr, $min: expr) => {{
103        #[cfg(not(feature = "serial"))]
104        let result = $e.par_iter().with_min_len($min);
105
106        #[cfg(feature = "serial")]
107        let result = $e.iter();
108
109        result
110    }};
111}
112
113/// Creates parallel iterator over mut refs if `parallel` feature is enabled.
114#[macro_export]
115macro_rules! cfg_iter_mut {
116    ($e: expr) => {{
117        #[cfg(not(feature = "serial"))]
118        let result = $e.par_iter_mut();
119
120        #[cfg(feature = "serial")]
121        let result = $e.iter_mut();
122
123        result
124    }};
125
126    ($e: expr, $min: expr) => {{
127        #[cfg(not(feature = "serial"))]
128        let result = $e.par_iter_mut().with_min_len($min);
129
130        #[cfg(feature = "serial")]
131        let result = $e.iter_mut();
132
133        result
134    }};
135}
136
137/// Creates parallel iterator if `parallel` feature is enabled.
138#[macro_export]
139macro_rules! cfg_into_iter {
140    ($e: expr) => {{
141        #[cfg(not(feature = "serial"))]
142        let result = $e.into_par_iter();
143
144        #[cfg(feature = "serial")]
145        let result = $e.into_iter();
146
147        result
148    }};
149}
150
151/// Returns an iterator over `chunk_size` elements of the slice at a
152/// time.
153#[macro_export]
154macro_rules! cfg_chunks {
155    ($e: expr, $size: expr) => {{
156        #[cfg(not(feature = "serial"))]
157        let result = $e.par_chunks($size);
158
159        #[cfg(feature = "serial")]
160        let result = $e.chunks($size);
161
162        result
163    }};
164}
165
166/// Returns an iterator over `chunk_size` elements of the slice at a time.
167#[macro_export]
168macro_rules! cfg_chunks_mut {
169    ($e: expr, $size: expr) => {{
170        #[cfg(not(feature = "serial"))]
171        let result = $e.par_chunks_mut($size);
172
173        #[cfg(feature = "serial")]
174        let result = $e.chunks_mut($size);
175
176        result
177    }};
178}
179
180/// Creates parallel iterator from iterator if `parallel` feature is enabled.
181#[macro_export]
182macro_rules! cfg_par_bridge {
183    ($e: expr) => {{
184        #[cfg(not(feature = "serial"))]
185        let result = $e.par_bridge();
186
187        #[cfg(feature = "serial")]
188        let result = $e;
189
190        result
191    }};
192}
193
194/// Applies the reduce operation over an iterator.
195#[macro_export]
196macro_rules! cfg_reduce {
197    ($e: expr, $default: expr, $op: expr) => {{
198        #[cfg(not(feature = "serial"))]
199        let result = $e.reduce($default, $op);
200
201        #[cfg(feature = "serial")]
202        let result = $e.fold($default(), $op);
203
204        result
205    }};
206}
207
208/// Applies `reduce_with` or `reduce` depending on the `serial` feature.
209#[macro_export]
210macro_rules! cfg_reduce_with {
211    ($e: expr, $op: expr) => {{
212        #[cfg(not(feature = "serial"))]
213        let result = $e.reduce_with($op);
214
215        #[cfg(feature = "serial")]
216        let result = $e.reduce($op);
217
218        result
219    }};
220}
221
222/// Turns a collection into an iterator.
223#[macro_export]
224macro_rules! cfg_keys {
225    ($e: expr) => {{
226        #[cfg(not(feature = "serial"))]
227        let result = $e.par_keys();
228
229        #[cfg(feature = "serial")]
230        let result = $e.keys();
231
232        result
233    }};
234}
235
236/// Turns a collection into an iterator.
237#[macro_export]
238macro_rules! cfg_values {
239    ($e: expr) => {{
240        #[cfg(not(feature = "serial"))]
241        let result = $e.par_values();
242
243        #[cfg(feature = "serial")]
244        let result = $e.values();
245
246        result
247    }};
248}
249
250/// Find an element `e` where `lambda(e)` evalutes to true (if any).
251///
252/// # Notes
253/// - This returns at most one entry that satisfies the given condition, not necessarily the first one.
254/// - `closure` must be a lambda function returning a boolean, e.g., `|e| e > 0`.
255#[macro_export]
256macro_rules! cfg_find {
257    ($object:expr, $closure:expr) => {{
258        #[cfg(not(feature = "serial"))]
259        let result = $object.par_values().find_any($closure);
260
261        #[cfg(feature = "serial")]
262        let result = $object.values().find($closure);
263
264        result
265    }};
266}
267
268/// Applies a function and returns an entry where `lambda(e)` is not None.
269///
270/// # Notes
271/// - This returns at most one entry that satisfies the given condition, not necessarily the first one.
272/// - `closure` must be a lambda function returning Option, e.g., `|e| Some(e)`.
273#[macro_export]
274macro_rules! cfg_find_map {
275    ($object:expr, $closure:expr) => {{
276        #[cfg(not(feature = "serial"))]
277        let result = $object.par_values().filter_map($closure).find_any(|_| true);
278
279        #[cfg(feature = "serial")]
280        let result = $object.values().find_map($closure);
281
282        result
283    }};
284}
285
286/// Applies fold to the iterator
287#[macro_export]
288macro_rules! cfg_zip_fold {
289    ($self: expr, $other: expr, $init: expr, $op: expr, $type: ty) => {{
290        let default = $init;
291
292        #[cfg(feature = "serial")]
293        let default = $init();
294        let result = $self.zip_eq($other).fold(default, $op);
295
296        #[cfg(not(feature = "serial"))]
297        let result = result.sum::<$type>();
298
299        result
300    }};
301}
302
303/// Performs an unstable sort
304#[macro_export]
305macro_rules! cfg_sort_unstable_by {
306    ($self: expr, $closure: expr) => {{
307        #[cfg(feature = "serial")]
308        $self.sort_unstable_by($closure);
309
310        #[cfg(not(feature = "serial"))]
311        $self.par_sort_unstable_by($closure);
312    }};
313}
314
315/// Performs a sort that caches the extracted keys
316#[macro_export]
317macro_rules! cfg_sort_by_cached_key {
318    ($self: expr, $closure: expr) => {{
319        #[cfg(feature = "serial")]
320        $self.sort_by_cached_key($closure);
321
322        #[cfg(not(feature = "serial"))]
323        $self.par_sort_by_cached_key($closure);
324    }};
325}
326
327/// Returns a sorted, by-value iterator for the given IndexMap/IndexSet
328#[macro_export]
329macro_rules! cfg_sorted_by {
330    ($self: expr, $closure: expr) => {{
331        #[cfg(feature = "serial")]
332        {
333            $self.sorted_by($closure)
334        }
335
336        #[cfg(not(feature = "serial"))]
337        {
338            $self.par_sorted_by($closure)
339        }
340    }};
341}