1use std::thread;
6use std::sync::{Arc, Mutex};
7
8pub fn parallel_transform_inplace<T, F>(items: &mut [T], f: F)
10where
11 T: Send,
12 F: Fn(&mut T) + Send + Sync,
13{
14 let len = items.len();
15 if len == 0 {
16 return;
17 }
18
19 let num_threads = thread::available_parallelism()
20 .map(|n| n.get())
21 .unwrap_or(1);
22 let chunk_size = (len + num_threads - 1) / num_threads;
23
24 if chunk_size >= len {
25 items.iter_mut().for_each(|item| f(item));
26 return;
27 }
28
29 let chunk_size_calc = (len + num_threads - 1) / num_threads;
32 for chunk_start in (0..len).step_by(chunk_size_calc) {
33 let chunk_end = (chunk_start + chunk_size_calc).min(len);
34 let chunk = &mut items[chunk_start..chunk_end];
35 chunk.iter_mut().for_each(|item| f(item));
36 }
37}
38
39pub fn parallel_fold_efficient<T, R, F, G, C>(items: &[T], identity: G, fold_op: F, combine: C) -> R
41where
42 T: Sync + Send,
43 R: Send + Clone + 'static + std::fmt::Debug,
44 F: Fn(R, &T) -> R + Send + Sync + Clone,
45 G: Fn() -> R + Send + Sync + Clone,
46 C: Fn(R, R) -> R + Send + Sync,
47{
48 let len = items.len();
49 if len == 0 {
50 return identity();
51 }
52
53 let num_threads = thread::available_parallelism()
54 .map(|n| n.get())
55 .unwrap_or(4);
56 let chunk_size = (len + num_threads - 1) / num_threads;
57
58 let results = Arc::new(Mutex::new(Vec::new()));
59
60 thread::scope(|s| {
61 for chunk in items.chunks(chunk_size) {
62 let results = Arc::clone(&results);
63 let identity = identity.clone();
64 let fold_op = fold_op.clone();
65
66 s.spawn(move || {
67 let local_result = chunk.iter().fold(identity(), |acc, item| {
68 fold_op(acc, item)
69 });
70 results.lock().unwrap().push(local_result);
71 });
72 }
73 });
74
75 let partial_results = Arc::try_unwrap(results).unwrap().into_inner().unwrap();
76 partial_results.into_iter().fold(identity(), |acc, result| {
77 combine(acc, result)
78 })
79}
80
81pub fn parallel_iter_nocopy<T, F>(items: &[T], f: F)
83where
84 T: Sync,
85 F: Fn(&T) + Send + Sync,
86{
87 let len = items.len();
88 if len == 0 {
89 return;
90 }
91
92 let num_threads = thread::available_parallelism()
93 .map(|n| n.get())
94 .unwrap_or(1);
95 let chunk_size = (len + num_threads - 1) / num_threads;
96
97 if chunk_size >= len {
98 items.iter().for_each(&f);
99 return;
100 }
101
102 let f = Arc::new(f);
103
104 thread::scope(|s| {
105 for chunk in items.chunks(chunk_size) {
106 let f = Arc::clone(&f);
107 s.spawn(move || {
108 chunk.iter().for_each(|item| f(item));
109 });
110 }
111 });
112}
113
114pub fn streaming_parallel_map<T, R, F>(items: &[T], f: F) -> impl Iterator<Item = R>
116where
117 T: Sync + Clone,
118 R: Send + 'static,
119 F: Fn(&T) -> R + Send + Sync + 'static,
120{
121 use crate::executor::parallel_map;
122 parallel_map(items, f).into_iter()
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128
129 #[test]
130 fn test_parallel_transform_inplace() {
131 let mut data = vec![1, 2, 3, 4, 5];
132 parallel_transform_inplace(&mut data, |x| *x *= 2);
133 assert_eq!(data, vec![2, 4, 6, 8, 10]);
134 }
135
136 #[test]
137 fn test_parallel_fold_efficient() {
138 let data: Vec<i32> = vec![1, 2, 3, 4, 5];
139 let sum = parallel_fold_efficient(
141 &data,
142 || 0,
143 |acc, x| acc + x,
144 |a, b| a + b
145 );
146 assert_eq!(sum, 15);
147 }
148
149 #[test]
150 fn test_parallel_iter_nocopy() {
151 let data = vec![1, 2, 3, 4, 5];
152 let counter = Arc::new(Mutex::new(0));
153 let counter_clone = Arc::clone(&counter);
154
155 parallel_iter_nocopy(&data, |_| {
156 *counter_clone.lock().unwrap() += 1;
157 });
158
159 assert_eq!(*counter.lock().unwrap(), 5);
160 }
161}
162
163
164
165
166