snarkvm_utilities/
parallel.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkVM library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{boxed::Box, vec::Vec};
17
18pub struct ExecutionPool<'a, T> {
19    jobs: Vec<Box<dyn 'a + FnOnce() -> T + Send>>,
20}
21
22impl<'a, T> ExecutionPool<'a, T> {
23    pub fn new() -> Self {
24        Self { jobs: Vec::new() }
25    }
26
27    pub fn with_capacity(cap: usize) -> Self {
28        Self { jobs: Vec::with_capacity(cap) }
29    }
30
31    pub fn add_job<F: 'a + FnOnce() -> T + Send>(&mut self, f: F) {
32        self.jobs.push(Box::new(f));
33    }
34
35    pub fn execute_all(self) -> Vec<T>
36    where
37        T: Send + Sync,
38    {
39        #[cfg(not(feature = "serial"))]
40        {
41            use rayon::prelude::*;
42            execute_with_max_available_threads(|| self.jobs.into_par_iter().map(|f| f()).collect())
43        }
44        #[cfg(feature = "serial")]
45        {
46            self.jobs.into_iter().map(|f| f()).collect()
47        }
48    }
49}
50
51impl<T> Default for ExecutionPool<'_, T> {
52    fn default() -> Self {
53        Self::new()
54    }
55}
56
57#[cfg(not(feature = "serial"))]
58pub fn max_available_threads() -> usize {
59    use aleo_std::Cpu;
60    let rayon_threads = rayon::current_num_threads();
61
62    match aleo_std::get_cpu() {
63        Cpu::Intel => num_cpus::get_physical().min(rayon_threads),
64        Cpu::AMD | Cpu::Unknown => rayon_threads,
65    }
66}
67
68#[inline(always)]
69#[cfg(not(any(feature = "serial", feature = "wasm")))]
70pub fn execute_with_max_available_threads<T: Sync + Send>(f: impl FnOnce() -> T + Send) -> T {
71    execute_with_threads(f, max_available_threads())
72}
73
74#[inline(always)]
75#[cfg(any(feature = "serial", feature = "wasm"))]
76pub fn execute_with_max_available_threads<T>(f: impl FnOnce() -> T + Send) -> T {
77    f()
78}
79
80#[cfg(not(any(feature = "serial", feature = "wasm")))]
81#[inline(always)]
82fn execute_with_threads<T: Sync + Send>(f: impl FnOnce() -> T + Send, num_threads: usize) -> T {
83    if rayon::current_thread_index().is_none() {
84        let pool = rayon::ThreadPoolBuilder::new().num_threads(num_threads).build().unwrap();
85        pool.install(f)
86    } else {
87        f()
88    }
89}
90
91/// Creates parallel iterator over refs if `parallel` feature is enabled.
92#[macro_export]
93macro_rules! cfg_iter {
94    ($e: expr) => {{
95        #[cfg(not(feature = "serial"))]
96        let result = $e.par_iter();
97
98        #[cfg(feature = "serial")]
99        let result = $e.iter();
100
101        result
102    }};
103}
104
105/// Creates parallel iterator over mut refs if `parallel` feature is enabled.
106#[macro_export]
107macro_rules! cfg_iter_mut {
108    ($e: expr) => {{
109        #[cfg(not(feature = "serial"))]
110        let result = $e.par_iter_mut();
111
112        #[cfg(feature = "serial")]
113        let result = $e.iter_mut();
114
115        result
116    }};
117}
118
119/// Creates parallel iterator if `parallel` feature is enabled.
120#[macro_export]
121macro_rules! cfg_into_iter {
122    ($e: expr) => {{
123        #[cfg(not(feature = "serial"))]
124        let result = $e.into_par_iter();
125
126        #[cfg(feature = "serial")]
127        let result = $e.into_iter();
128
129        result
130    }};
131}
132
133/// Returns an iterator over `chunk_size` elements of the slice at a
134/// time.
135#[macro_export]
136macro_rules! cfg_chunks {
137    ($e: expr, $size: expr) => {{
138        #[cfg(not(feature = "serial"))]
139        let result = $e.par_chunks($size);
140
141        #[cfg(feature = "serial")]
142        let result = $e.chunks($size);
143
144        result
145    }};
146}
147
148/// Returns an iterator over `chunk_size` elements of the slice at a time.
149#[macro_export]
150macro_rules! cfg_chunks_mut {
151    ($e: expr, $size: expr) => {{
152        #[cfg(not(feature = "serial"))]
153        let result = $e.par_chunks_mut($size);
154
155        #[cfg(feature = "serial")]
156        let result = $e.chunks_mut($size);
157
158        result
159    }};
160}
161
162/// Creates parallel iterator from iterator if `parallel` feature is enabled.
163#[macro_export]
164macro_rules! cfg_par_bridge {
165    ($e: expr) => {{
166        #[cfg(not(feature = "serial"))]
167        let result = $e.par_bridge();
168
169        #[cfg(feature = "serial")]
170        let result = $e;
171
172        result
173    }};
174}
175
176/// Applies the reduce operation over an iterator.
177#[macro_export]
178macro_rules! cfg_reduce {
179    ($e: expr, $default: expr, $op: expr) => {{
180        #[cfg(not(feature = "serial"))]
181        let result = $e.reduce($default, $op);
182
183        #[cfg(feature = "serial")]
184        let result = $e.fold($default(), $op);
185
186        result
187    }};
188}
189
190/// Applies `reduce_with` or `reduce` depending on the `serial` feature.
191#[macro_export]
192macro_rules! cfg_reduce_with {
193    ($e: expr, $op: expr) => {{
194        #[cfg(not(feature = "serial"))]
195        let result = $e.reduce_with($op);
196
197        #[cfg(feature = "serial")]
198        let result = $e.reduce($op);
199
200        result
201    }};
202}
203
204/// Turns a collection into an iterator.
205#[macro_export]
206macro_rules! cfg_keys {
207    ($e: expr) => {{
208        #[cfg(not(feature = "serial"))]
209        let result = $e.par_keys();
210
211        #[cfg(feature = "serial")]
212        let result = $e.keys();
213
214        result
215    }};
216}
217
218/// Turns a collection into an iterator.
219#[macro_export]
220macro_rules! cfg_values {
221    ($e: expr) => {{
222        #[cfg(not(feature = "serial"))]
223        let result = $e.par_values();
224
225        #[cfg(feature = "serial")]
226        let result = $e.values();
227
228        result
229    }};
230}
231
232/// Find an element `e` where `lambda(e)` evalutes to true (if any).
233///
234/// # Notes
235/// - This returns at most one entry that satisfies the given condition, not necessarily the first one.
236/// - `closure` must be a lambda function returning a boolean, e.g., `|e| e > 0`.
237#[macro_export]
238macro_rules! cfg_find {
239    ($object:expr, $closure:expr) => {{
240        #[cfg(not(feature = "serial"))]
241        let result = $object.par_values().find_any($closure);
242
243        #[cfg(feature = "serial")]
244        let result = $object.values().find($closure);
245
246        result
247    }};
248}
249
250/// Applies a function and returns an entry where `lambda(e)` is not None.
251///
252/// # Notes
253/// - This returns at most one entry that satisfies the given condition, not necessarily the first one.
254/// - `closure` must be a lambda function returning Option, e.g., `|e| Some(e)`.
255#[macro_export]
256macro_rules! cfg_find_map {
257    ($object:expr, $closure:expr) => {{
258        #[cfg(not(feature = "serial"))]
259        let result = $object.par_values().filter_map($closure).find_any(|_| true);
260
261        #[cfg(feature = "serial")]
262        let result = $object.values().find_map($closure);
263
264        result
265    }};
266}
267
268/// Applies fold to the iterator
269#[macro_export]
270macro_rules! cfg_zip_fold {
271    ($self: expr, $other: expr, $init: expr, $op: expr, $type: ty) => {{
272        let default = $init;
273
274        #[cfg(feature = "serial")]
275        let default = $init();
276        let result = $self.zip_eq($other).fold(default, $op);
277
278        #[cfg(not(feature = "serial"))]
279        let result = result.sum::<$type>();
280
281        result
282    }};
283}
284
285/// Performs an unstable sort
286#[macro_export]
287macro_rules! cfg_sort_unstable_by {
288    ($self: expr, $closure: expr) => {{
289        #[cfg(feature = "serial")]
290        $self.sort_unstable_by($closure);
291
292        #[cfg(not(feature = "serial"))]
293        $self.par_sort_unstable_by($closure);
294    }};
295}
296
297/// Performs a sort that caches the extracted keys
298#[macro_export]
299macro_rules! cfg_sort_by_cached_key {
300    ($self: expr, $closure: expr) => {{
301        #[cfg(feature = "serial")]
302        $self.sort_by_cached_key($closure);
303
304        #[cfg(not(feature = "serial"))]
305        $self.par_sort_by_cached_key($closure);
306    }};
307}
308
309/// Returns a sorted, by-value iterator for the given IndexMap/IndexSet
310#[macro_export]
311macro_rules! cfg_sorted_by {
312    ($self: expr, $closure: expr) => {{
313        #[cfg(feature = "serial")]
314        {
315            $self.sorted_by($closure)
316        }
317
318        #[cfg(not(feature = "serial"))]
319        {
320            $self.par_sorted_by($closure)
321        }
322    }};
323}