avx_parallel/
memory.rs

1//! Memory-efficient parallel operations with minimal allocations
2//!
3//! This module provides operations optimized for memory usage
4
5use std::thread;
6use std::sync::{Arc, Mutex};
7
8/// In-place parallel transformation
9pub fn parallel_transform_inplace<T, F>(items: &mut [T], f: F)
10where
11    T: Send,
12    F: Fn(&mut T) + Send + Sync,
13{
14    let len = items.len();
15    if len == 0 {
16        return;
17    }
18
19    let num_threads = thread::available_parallelism()
20        .map(|n| n.get())
21        .unwrap_or(1);
22    let chunk_size = (len + num_threads - 1) / num_threads;
23
24    if chunk_size >= len {
25        items.iter_mut().for_each(|item| f(item));
26        return;
27    }
28
29    // Use standard parallel for_each instead of unsafe pointers
30    // Process chunks safely sequentially for now
31    let chunk_size_calc = (len + num_threads - 1) / num_threads;
32    for chunk_start in (0..len).step_by(chunk_size_calc) {
33        let chunk_end = (chunk_start + chunk_size_calc).min(len);
34        let chunk = &mut items[chunk_start..chunk_end];
35        chunk.iter_mut().for_each(|item| f(item));
36    }
37}
38
39/// Parallel fold with minimal allocations
40pub fn parallel_fold_efficient<T, R, F, G, C>(items: &[T], identity: G, fold_op: F, combine: C) -> R
41where
42    T: Sync + Send,
43    R: Send + Clone + 'static + std::fmt::Debug,
44    F: Fn(R, &T) -> R + Send + Sync + Clone,
45    G: Fn() -> R + Send + Sync + Clone,
46    C: Fn(R, R) -> R + Send + Sync,
47{
48    let len = items.len();
49    if len == 0 {
50        return identity();
51    }
52
53    let num_threads = thread::available_parallelism()
54        .map(|n| n.get())
55        .unwrap_or(4);
56    let chunk_size = (len + num_threads - 1) / num_threads;
57
58    let results = Arc::new(Mutex::new(Vec::new()));
59
60    thread::scope(|s| {
61        for chunk in items.chunks(chunk_size) {
62            let results = Arc::clone(&results);
63            let identity = identity.clone();
64            let fold_op = fold_op.clone();
65
66            s.spawn(move || {
67                let local_result = chunk.iter().fold(identity(), |acc, item| {
68                    fold_op(acc, item)
69                });
70                results.lock().unwrap().push(local_result);
71            });
72        }
73    });
74
75    let partial_results = Arc::try_unwrap(results).unwrap().into_inner().unwrap();
76    partial_results.into_iter().fold(identity(), |acc, result| {
77        combine(acc, result)
78    })
79}
80
81/// Zero-copy parallel iteration
82pub fn parallel_iter_nocopy<T, F>(items: &[T], f: F)
83where
84    T: Sync,
85    F: Fn(&T) + Send + Sync,
86{
87    let len = items.len();
88    if len == 0 {
89        return;
90    }
91
92    let num_threads = thread::available_parallelism()
93        .map(|n| n.get())
94        .unwrap_or(1);
95    let chunk_size = (len + num_threads - 1) / num_threads;
96
97    if chunk_size >= len {
98        items.iter().for_each(&f);
99        return;
100    }
101
102    let f = Arc::new(f);
103
104    thread::scope(|s| {
105        for chunk in items.chunks(chunk_size) {
106            let f = Arc::clone(&f);
107            s.spawn(move || {
108                chunk.iter().for_each(|item| f(item));
109            });
110        }
111    });
112}
113
114/// Streaming parallel map (process as results come)
115pub fn streaming_parallel_map<T, R, F>(items: &[T], f: F) -> impl Iterator<Item = R>
116where
117    T: Sync + Clone,
118    R: Send + 'static,
119    F: Fn(&T) -> R + Send + Sync + 'static,
120{
121    use crate::executor::parallel_map;
122    parallel_map(items, f).into_iter()
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128
129    #[test]
130    fn test_parallel_transform_inplace() {
131        let mut data = vec![1, 2, 3, 4, 5];
132        parallel_transform_inplace(&mut data, |x| *x *= 2);
133        assert_eq!(data, vec![2, 4, 6, 8, 10]);
134    }
135
136    #[test]
137    fn test_parallel_fold_efficient() {
138        let data: Vec<i32> = vec![1, 2, 3, 4, 5];
139        // Test fold with separate combine operation
140        let sum = parallel_fold_efficient(
141            &data,
142            || 0,
143            |acc, x| acc + x,
144            |a, b| a + b
145        );
146        assert_eq!(sum, 15);
147    }
148
149    #[test]
150    fn test_parallel_iter_nocopy() {
151        let data = vec![1, 2, 3, 4, 5];
152        let counter = Arc::new(Mutex::new(0));
153        let counter_clone = Arc::clone(&counter);
154
155        parallel_iter_nocopy(&data, |_| {
156            *counter_clone.lock().unwrap() += 1;
157        });
158
159        assert_eq!(*counter.lock().unwrap(), 5);
160    }
161}
162
163
164
165
166