snarkvm_utilities/
parallel.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkVM library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16pub struct ExecutionPool<'a, T> {
17    jobs: Vec<Box<dyn 'a + FnOnce() -> T + Send>>,
18}
19
20impl<'a, T> ExecutionPool<'a, T> {
21    pub fn new() -> Self {
22        Self { jobs: Vec::new() }
23    }
24
25    pub fn with_capacity(cap: usize) -> Self {
26        Self { jobs: Vec::with_capacity(cap) }
27    }
28
29    pub fn add_job<F: 'a + FnOnce() -> T + Send>(&mut self, f: F) {
30        self.jobs.push(Box::new(f));
31    }
32
33    pub fn execute_all(self) -> Vec<T>
34    where
35        T: Send + Sync,
36    {
37        #[cfg(not(feature = "serial"))]
38        {
39            use rayon::prelude::*;
40            execute_with_max_available_threads(|| self.jobs.into_par_iter().map(|f| f()).collect())
41        }
42        #[cfg(feature = "serial")]
43        {
44            self.jobs.into_iter().map(|f| f()).collect()
45        }
46    }
47}
48
49impl<T> Default for ExecutionPool<'_, T> {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55#[cfg(not(feature = "serial"))]
56pub fn max_available_threads() -> usize {
57    use aleo_std::Cpu;
58    let rayon_threads = rayon::current_num_threads();
59
60    match aleo_std::get_cpu() {
61        Cpu::Intel => num_cpus::get_physical().min(rayon_threads),
62        Cpu::AMD | Cpu::Unknown => rayon_threads,
63    }
64}
65
66#[inline(always)]
67#[cfg(not(any(feature = "serial", feature = "wasm")))]
68pub fn execute_with_max_available_threads<T: Sync + Send>(f: impl FnOnce() -> T + Send) -> T {
69    execute_with_threads(f, max_available_threads())
70}
71
72#[inline(always)]
73#[cfg(any(feature = "serial", feature = "wasm"))]
74pub fn execute_with_max_available_threads<T>(f: impl FnOnce() -> T + Send) -> T {
75    f()
76}
77
78#[cfg(not(any(feature = "serial", feature = "wasm")))]
79#[inline(always)]
80fn execute_with_threads<T: Sync + Send>(f: impl FnOnce() -> T + Send, num_threads: usize) -> T {
81    if rayon::current_thread_index().is_none() {
82        let pool = rayon::ThreadPoolBuilder::new().num_threads(num_threads).build().unwrap();
83        pool.install(f)
84    } else {
85        f()
86    }
87}
88
89/// Creates parallel iterator over refs if `parallel` feature is enabled.
90#[macro_export]
91macro_rules! cfg_iter {
92    ($e: expr) => {{
93        #[cfg(not(feature = "serial"))]
94        let result = $e.par_iter();
95
96        #[cfg(feature = "serial")]
97        let result = $e.iter();
98
99        result
100    }};
101}
102
103/// Creates parallel iterator over mut refs if `parallel` feature is enabled.
104#[macro_export]
105macro_rules! cfg_iter_mut {
106    ($e: expr) => {{
107        #[cfg(not(feature = "serial"))]
108        let result = $e.par_iter_mut();
109
110        #[cfg(feature = "serial")]
111        let result = $e.iter_mut();
112
113        result
114    }};
115}
116
117/// Creates parallel iterator if `parallel` feature is enabled.
118#[macro_export]
119macro_rules! cfg_into_iter {
120    ($e: expr) => {{
121        #[cfg(not(feature = "serial"))]
122        let result = $e.into_par_iter();
123
124        #[cfg(feature = "serial")]
125        let result = $e.into_iter();
126
127        result
128    }};
129}
130
131/// Returns an iterator over `chunk_size` elements of the slice at a
132/// time.
133#[macro_export]
134macro_rules! cfg_chunks {
135    ($e: expr, $size: expr) => {{
136        #[cfg(not(feature = "serial"))]
137        let result = $e.par_chunks($size);
138
139        #[cfg(feature = "serial")]
140        let result = $e.chunks($size);
141
142        result
143    }};
144}
145
146/// Returns an iterator over `chunk_size` elements of the slice at a time.
147#[macro_export]
148macro_rules! cfg_chunks_mut {
149    ($e: expr, $size: expr) => {{
150        #[cfg(not(feature = "serial"))]
151        let result = $e.par_chunks_mut($size);
152
153        #[cfg(feature = "serial")]
154        let result = $e.chunks_mut($size);
155
156        result
157    }};
158}
159
160/// Creates parallel iterator from iterator if `parallel` feature is enabled.
161#[macro_export]
162macro_rules! cfg_par_bridge {
163    ($e: expr) => {{
164        #[cfg(not(feature = "serial"))]
165        let result = $e.par_bridge();
166
167        #[cfg(feature = "serial")]
168        let result = $e;
169
170        result
171    }};
172}
173
174/// Applies the reduce operation over an iterator.
175#[macro_export]
176macro_rules! cfg_reduce {
177    ($e: expr, $default: expr, $op: expr) => {{
178        #[cfg(not(feature = "serial"))]
179        let result = $e.reduce($default, $op);
180
181        #[cfg(feature = "serial")]
182        let result = $e.fold($default(), $op);
183
184        result
185    }};
186}
187
188/// Applies `reduce_with` or `reduce` depending on the `serial` feature.
189#[macro_export]
190macro_rules! cfg_reduce_with {
191    ($e: expr, $op: expr) => {{
192        #[cfg(not(feature = "serial"))]
193        let result = $e.reduce_with($op);
194
195        #[cfg(feature = "serial")]
196        let result = $e.reduce($op);
197
198        result
199    }};
200}
201
202/// Turns a collection into an iterator.
203#[macro_export]
204macro_rules! cfg_keys {
205    ($e: expr) => {{
206        #[cfg(not(feature = "serial"))]
207        let result = $e.par_keys();
208
209        #[cfg(feature = "serial")]
210        let result = $e.keys();
211
212        result
213    }};
214}
215
216/// Turns a collection into an iterator.
217#[macro_export]
218macro_rules! cfg_values {
219    ($e: expr) => {{
220        #[cfg(not(feature = "serial"))]
221        let result = $e.par_values();
222
223        #[cfg(feature = "serial")]
224        let result = $e.values();
225
226        result
227    }};
228}
229
230/// Find an element `e` where `lambda(e)` evalutes to true (if any).
231///
232/// # Notes
233/// - This returns at most one entry that satisfies the given condition, not necessarily the first one.
234/// - `closure` must be a lambda function returning a boolean, e.g., `|e| e > 0`.
235#[macro_export]
236macro_rules! cfg_find {
237    ($object:expr, $closure:expr) => {{
238        #[cfg(not(feature = "serial"))]
239        let result = $object.par_values().find_any($closure);
240
241        #[cfg(feature = "serial")]
242        let result = $object.values().find($closure);
243
244        result
245    }};
246}
247
248/// Applies a function and returns an entry where `lambda(e)` is not None.
249///
250/// # Notes
251/// - This returns at most one entry that satisfies the given condition, not necessarily the first one.
252/// - `closure` must be a lambda function returning Option, e.g., `|e| Some(e)`.
253#[macro_export]
254macro_rules! cfg_find_map {
255    ($object:expr, $closure:expr) => {{
256        #[cfg(not(feature = "serial"))]
257        let result = $object.par_values().filter_map($closure).find_any(|_| true);
258
259        #[cfg(feature = "serial")]
260        let result = $object.values().find_map($closure);
261
262        result
263    }};
264}
265
266/// Applies fold to the iterator
267#[macro_export]
268macro_rules! cfg_zip_fold {
269    ($self: expr, $other: expr, $init: expr, $op: expr, $type: ty) => {{
270        let default = $init;
271
272        #[cfg(feature = "serial")]
273        let default = $init();
274        let result = $self.zip_eq($other).fold(default, $op);
275
276        #[cfg(not(feature = "serial"))]
277        let result = result.sum::<$type>();
278
279        result
280    }};
281}
282
283/// Performs an unstable sort
284#[macro_export]
285macro_rules! cfg_sort_unstable_by {
286    ($self: expr, $closure: expr) => {{
287        #[cfg(feature = "serial")]
288        $self.sort_unstable_by($closure);
289
290        #[cfg(not(feature = "serial"))]
291        $self.par_sort_unstable_by($closure);
292    }};
293}
294
295/// Performs a sort that caches the extracted keys
296#[macro_export]
297macro_rules! cfg_sort_by_cached_key {
298    ($self: expr, $closure: expr) => {{
299        #[cfg(feature = "serial")]
300        $self.sort_by_cached_key($closure);
301
302        #[cfg(not(feature = "serial"))]
303        $self.par_sort_by_cached_key($closure);
304    }};
305}
306
307/// Returns a sorted, by-value iterator for the given IndexMap/IndexSet
308#[macro_export]
309macro_rules! cfg_sorted_by {
310    ($self: expr, $closure: expr) => {{
311        #[cfg(feature = "serial")]
312        {
313            $self.sorted_by($closure)
314        }
315
316        #[cfg(not(feature = "serial"))]
317        {
318            $self.par_sorted_by($closure)
319        }
320    }};
321}