adze_concurrency_bounded_map_core/
lib.rs1#![forbid(unsafe_op_in_unsafe_fn)]
4#![deny(missing_docs)]
5#![cfg_attr(feature = "strict_api", deny(unreachable_pub))]
6#![cfg_attr(not(feature = "strict_api"), warn(unreachable_pub))]
7#![cfg_attr(feature = "strict_docs", deny(missing_docs))]
8#![cfg_attr(not(feature = "strict_docs"), allow(missing_docs))]
9
10use rayon::prelude::*;
11
12pub use adze_concurrency_plan_core::{ParallelPartitionPlan, normalized_concurrency};
14
15#[must_use]
30pub fn bounded_parallel_map<T, R, F>(items: Vec<T>, concurrency: usize, f: F) -> Vec<R>
31where
32 T: Send,
33 R: Send,
34 F: Fn(T) -> R + Send + Sync,
35{
36 let plan = ParallelPartitionPlan::for_item_count(items.len(), concurrency);
37
38 if items.is_empty() {
39 return Vec::new();
40 }
41
42 if plan.use_direct_parallel_iter {
43 return items.into_par_iter().map(f).collect();
44 }
45
46 items
47 .into_par_iter()
48 .chunks(plan.chunk_size)
49 .flat_map(|chunk| chunk.into_iter().map(&f).collect::<Vec<_>>())
50 .collect()
51}
52
53#[cfg(test)]
54mod tests {
55 use super::*;
56
57 #[test]
58 fn normalized_concurrency_is_never_zero() {
59 assert_eq!(normalized_concurrency(0), 1);
60 assert_eq!(normalized_concurrency(1), 1);
61 assert_eq!(normalized_concurrency(8), 8);
62 }
63
64 #[test]
65 fn bounded_parallel_map_handles_zero_concurrency() {
66 let mut result = bounded_parallel_map((0..64).collect::<Vec<_>>(), 0, |x| x * 2);
67 result.sort_unstable();
68
69 let expected: Vec<i32> = (0..64).map(|x| x * 2).collect();
70 assert_eq!(result, expected);
71 }
72
73 #[test]
74 fn bounded_parallel_map_handles_empty_input() {
75 let output: Vec<i32> = bounded_parallel_map(Vec::<i32>::new(), 8, |value| value * 2);
76 assert!(output.is_empty());
77 }
78
79 #[test]
80 fn bounded_parallel_map_single_element() {
81 let result = bounded_parallel_map(vec![7], 4, |x| x + 1);
82 assert_eq!(result, vec![8]);
83 }
84
85 #[test]
86 fn bounded_parallel_map_concurrency_exceeds_items() {
87 let mut result = bounded_parallel_map(vec![1, 2, 3], 100, |x| x * 10);
88 result.sort_unstable();
89 assert_eq!(result, vec![10, 20, 30]);
90 }
91
92 #[test]
93 fn bounded_parallel_map_preserves_output_length() {
94 let input: Vec<i32> = (0..100).collect();
95 let result = bounded_parallel_map(input.clone(), 4, |x| x * 2);
96 assert_eq!(result.len(), input.len());
97 }
98
99 #[test]
100 fn bounded_parallel_map_with_concurrency_one() {
101 let mut result = bounded_parallel_map(vec![3, 1, 4, 1, 5], 1, |x| x * x);
102 result.sort_unstable();
103 assert_eq!(result, vec![1, 1, 9, 16, 25]);
104 }
105
106 #[test]
107 fn partition_plan_zero_concurrency_normalizes() {
108 let plan = ParallelPartitionPlan::for_item_count(10, 0);
109 assert!(plan.concurrency >= 1);
110 assert!(plan.chunk_size >= 1);
111 }
112
113 #[test]
114 fn partition_plan_empty_items_is_safe() {
115 let plan = ParallelPartitionPlan::for_item_count(0, 4);
116 assert!(plan.use_direct_parallel_iter);
117 assert!(plan.chunk_size >= 1);
118 }
119}