avila_parallel/
adaptive.rs1use std::sync::atomic::{AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::thread;
8use std::time::Instant;
9
10pub struct AdaptiveExecutor {
12 optimal_chunk_size: Arc<AtomicUsize>,
13 sample_count: Arc<AtomicUsize>,
14}
15
16impl AdaptiveExecutor {
17 pub fn new() -> Self {
19 Self {
20 optimal_chunk_size: Arc::new(AtomicUsize::new(1024)),
21 sample_count: Arc::new(AtomicUsize::new(0)),
22 }
23 }
24
25 pub fn chunk_size(&self) -> usize {
27 self.optimal_chunk_size.load(Ordering::Relaxed)
28 }
29
30 fn update_chunk_size(&self, new_size: usize) {
32 let samples = self.sample_count.fetch_add(1, Ordering::Relaxed);
33 if samples < 10 {
34 let current = self.optimal_chunk_size.load(Ordering::Relaxed);
36 let avg = (current + new_size) / 2;
37 self.optimal_chunk_size.store(avg, Ordering::Relaxed);
38 }
39 }
40
41 pub fn adaptive_map<T, R, F>(&self, items: &[T], f: F) -> Vec<R>
43 where
44 T: Sync,
45 R: Send + 'static,
46 F: Fn(&T) -> R + Send + Sync,
47 {
48 use crate::executor::parallel_map;
49
50 let start = Instant::now();
51 let result = parallel_map(items, f);
52 let duration = start.elapsed();
53
54 if duration.as_millis() > 100 {
56 self.update_chunk_size(self.chunk_size() / 2);
57 } else if duration.as_millis() < 10 {
58 self.update_chunk_size(self.chunk_size() * 2);
59 }
60
61 result
62 }
63}
64
65impl Default for AdaptiveExecutor {
66 fn default() -> Self {
67 Self::new()
68 }
69}
70
71pub fn speculative_execute<T, R, F>(items: &[T], f: F) -> Vec<R>
73where
74 T: Sync + Clone,
75 R: Send + 'static + PartialEq,
76 F: Fn(&T) -> R + Send + Sync + Clone,
77{
78 let len = items.len();
79
80 if len < 1000 {
82 return items.iter().map(&f).collect();
83 }
84
85 if len < 100_000 {
87 use crate::executor::parallel_map;
88 return parallel_map(items, f);
89 }
90
91 use crate::executor::parallel_map;
93 parallel_map(items, f)
94}
95
96pub fn hierarchical_map<T, R, F>(items: &[T], depth: usize, f: F) -> Vec<R>
98where
99 T: Sync,
100 R: Send + 'static,
101 F: Fn(&T) -> R + Send + Sync + Clone,
102{
103 use crate::executor::parallel_map;
104
105 if depth == 0 || items.len() < 1000 {
106 return items.iter().map(&f).collect();
107 }
108
109 let num_chunks = thread::available_parallelism()
111 .map(|n| n.get())
112 .unwrap_or(1);
113
114 let chunk_size = (items.len() + num_chunks - 1) / num_chunks;
115
116 let results: Vec<_> = items.chunks(chunk_size)
117 .map(|chunk| {
118 if depth > 1 {
119 hierarchical_map(chunk, depth - 1, f.clone())
120 } else {
121 parallel_map(chunk, &f)
122 }
123 })
124 .collect();
125
126 results.into_iter().flatten().collect()
127}
128
129pub fn cache_aware_map<T, R, F>(items: &[T], f: F) -> Vec<R>
131where
132 T: Sync,
133 R: Send + 'static,
134 F: Fn(&T) -> R + Send + Sync,
135{
136 use crate::executor::parallel_map;
138 parallel_map(items, f)
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144
145 #[test]
146 fn test_adaptive_executor() {
147 let executor = AdaptiveExecutor::new();
148 let data: Vec<i32> = (1..=1000).collect();
149 let results = executor.adaptive_map(&data, |x| x * 2);
150 assert_eq!(results.len(), 1000);
151 }
152
153 #[test]
154 fn test_speculative_execute() {
155 let data: Vec<i32> = vec![1, 2, 3, 4, 5];
156 let results = speculative_execute(&data, |x| x * 2);
157 assert_eq!(results, vec![2, 4, 6, 8, 10]);
158 }
159
160 #[test]
161 fn test_hierarchical_map() {
162 let data: Vec<i32> = (1..=100).collect();
163 let results = hierarchical_map(&data, 2, |x| x * 2);
164 assert_eq!(results.len(), 100);
165 }
166
167 #[test]
168 fn test_cache_aware_map() {
169 let data: Vec<i32> = (1..=100).collect();
170 let results = cache_aware_map(&data, |x| x * 2);
171 assert_eq!(results.len(), 100);
172 }
173}