Skip to main content

adze_concurrency_bounded_map_core/
lib.rs

1//! Core bounded parallel map implementation.
2
3#![forbid(unsafe_op_in_unsafe_fn)]
4#![deny(missing_docs)]
5#![cfg_attr(feature = "strict_api", deny(unreachable_pub))]
6#![cfg_attr(not(feature = "strict_api"), warn(unreachable_pub))]
7#![cfg_attr(feature = "strict_docs", deny(missing_docs))]
8#![cfg_attr(not(feature = "strict_docs"), allow(missing_docs))]
9
10use rayon::prelude::*;
11
12/// Re-exported partition planning types used by the bounded parallel map.
13pub use adze_concurrency_plan_core::{ParallelPartitionPlan, normalized_concurrency};
14
15/// Run a bounded parallel map operation.
16///
17/// This keeps work partitioned by `concurrency`, while preserving all outputs.
18///
19/// # Examples
20///
21/// ```
22/// use adze_concurrency_bounded_map_core::bounded_parallel_map;
23///
24/// let input: Vec<i32> = (0..10).collect();
25/// let mut result = bounded_parallel_map(input, 4, |x| x * 2);
26/// result.sort();
27/// assert_eq!(result, vec![0, 2, 4, 6, 8, 10, 12, 14, 16, 18]);
28/// ```
29#[must_use]
30pub fn bounded_parallel_map<T, R, F>(items: Vec<T>, concurrency: usize, f: F) -> Vec<R>
31where
32    T: Send,
33    R: Send,
34    F: Fn(T) -> R + Send + Sync,
35{
36    let plan = ParallelPartitionPlan::for_item_count(items.len(), concurrency);
37
38    if items.is_empty() {
39        return Vec::new();
40    }
41
42    if plan.use_direct_parallel_iter {
43        return items.into_par_iter().map(f).collect();
44    }
45
46    items
47        .into_par_iter()
48        .chunks(plan.chunk_size)
49        .flat_map(|chunk| chunk.into_iter().map(&f).collect::<Vec<_>>())
50        .collect()
51}
52
53#[cfg(test)]
54mod tests {
55    use super::*;
56
57    #[test]
58    fn normalized_concurrency_is_never_zero() {
59        assert_eq!(normalized_concurrency(0), 1);
60        assert_eq!(normalized_concurrency(1), 1);
61        assert_eq!(normalized_concurrency(8), 8);
62    }
63
64    #[test]
65    fn bounded_parallel_map_handles_zero_concurrency() {
66        let mut result = bounded_parallel_map((0..64).collect::<Vec<_>>(), 0, |x| x * 2);
67        result.sort_unstable();
68
69        let expected: Vec<i32> = (0..64).map(|x| x * 2).collect();
70        assert_eq!(result, expected);
71    }
72
73    #[test]
74    fn bounded_parallel_map_handles_empty_input() {
75        let output: Vec<i32> = bounded_parallel_map(Vec::<i32>::new(), 8, |value| value * 2);
76        assert!(output.is_empty());
77    }
78
79    #[test]
80    fn bounded_parallel_map_single_element() {
81        let result = bounded_parallel_map(vec![7], 4, |x| x + 1);
82        assert_eq!(result, vec![8]);
83    }
84
85    #[test]
86    fn bounded_parallel_map_concurrency_exceeds_items() {
87        let mut result = bounded_parallel_map(vec![1, 2, 3], 100, |x| x * 10);
88        result.sort_unstable();
89        assert_eq!(result, vec![10, 20, 30]);
90    }
91
92    #[test]
93    fn bounded_parallel_map_preserves_output_length() {
94        let input: Vec<i32> = (0..100).collect();
95        let result = bounded_parallel_map(input.clone(), 4, |x| x * 2);
96        assert_eq!(result.len(), input.len());
97    }
98
99    #[test]
100    fn bounded_parallel_map_with_concurrency_one() {
101        let mut result = bounded_parallel_map(vec![3, 1, 4, 1, 5], 1, |x| x * x);
102        result.sort_unstable();
103        assert_eq!(result, vec![1, 1, 9, 16, 25]);
104    }
105
106    #[test]
107    fn partition_plan_zero_concurrency_normalizes() {
108        let plan = ParallelPartitionPlan::for_item_count(10, 0);
109        assert!(plan.concurrency >= 1);
110        assert!(plan.chunk_size >= 1);
111    }
112
113    #[test]
114    fn partition_plan_empty_items_is_safe() {
115        let plan = ParallelPartitionPlan::for_item_count(0, 4);
116        assert!(plan.use_direct_parallel_iter);
117        assert!(plan.chunk_size >= 1);
118    }
119}