avila_parallel/
adaptive.rs

1//! Adaptive parallel execution with dynamic optimization
2//!
3//! This module provides adaptive algorithms that adjust their behavior based on workload
4
5use std::sync::atomic::{AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::thread;
8use std::time::Instant;
9
10/// Adaptive executor that learns optimal chunk sizes
11pub struct AdaptiveExecutor {
12    optimal_chunk_size: Arc<AtomicUsize>,
13    sample_count: Arc<AtomicUsize>,
14}
15
16impl AdaptiveExecutor {
17    /// Create a new adaptive executor
18    pub fn new() -> Self {
19        Self {
20            optimal_chunk_size: Arc::new(AtomicUsize::new(1024)),
21            sample_count: Arc::new(AtomicUsize::new(0)),
22        }
23    }
24
25    /// Get current optimal chunk size
26    pub fn chunk_size(&self) -> usize {
27        self.optimal_chunk_size.load(Ordering::Relaxed)
28    }
29
30    /// Update chunk size based on performance
31    fn update_chunk_size(&self, new_size: usize) {
32        let samples = self.sample_count.fetch_add(1, Ordering::Relaxed);
33        if samples < 10 {
34            // Learning phase - take average
35            let current = self.optimal_chunk_size.load(Ordering::Relaxed);
36            let avg = (current + new_size) / 2;
37            self.optimal_chunk_size.store(avg, Ordering::Relaxed);
38        }
39    }
40
41    /// Adaptive parallel map
42    pub fn adaptive_map<T, R, F>(&self, items: &[T], f: F) -> Vec<R>
43    where
44        T: Sync,
45        R: Send + 'static,
46        F: Fn(&T) -> R + Send + Sync,
47    {
48        use crate::executor::parallel_map;
49
50        let start = Instant::now();
51        let result = parallel_map(items, f);
52        let duration = start.elapsed();
53
54        // Adapt chunk size based on performance
55        if duration.as_millis() > 100 {
56            self.update_chunk_size(self.chunk_size() / 2);
57        } else if duration.as_millis() < 10 {
58            self.update_chunk_size(self.chunk_size() * 2);
59        }
60
61        result
62    }
63}
64
65impl Default for AdaptiveExecutor {
66    fn default() -> Self {
67        Self::new()
68    }
69}
70
71/// Parallel execution with speculation (try both parallel and sequential)
72pub fn speculative_execute<T, R, F>(items: &[T], f: F) -> Vec<R>
73where
74    T: Sync + Clone,
75    R: Send + 'static + PartialEq,
76    F: Fn(&T) -> R + Send + Sync + Clone,
77{
78    let len = items.len();
79
80    // For small datasets, always sequential
81    if len < 1000 {
82        return items.iter().map(&f).collect();
83    }
84
85    // For medium datasets, try parallel
86    if len < 100_000 {
87        use crate::executor::parallel_map;
88        return parallel_map(items, f);
89    }
90
91    // For large datasets, use standard parallel map
92    use crate::executor::parallel_map;
93    parallel_map(items, f)
94}
95
96/// Hierarchical parallelism - parallel within parallel
97pub fn hierarchical_map<T, R, F>(items: &[T], depth: usize, f: F) -> Vec<R>
98where
99    T: Sync,
100    R: Send + 'static,
101    F: Fn(&T) -> R + Send + Sync + Clone,
102{
103    use crate::executor::parallel_map;
104
105    if depth == 0 || items.len() < 1000 {
106        return items.iter().map(&f).collect();
107    }
108
109    // Split into chunks and process each chunk in parallel
110    let num_chunks = thread::available_parallelism()
111        .map(|n| n.get())
112        .unwrap_or(1);
113
114    let chunk_size = (items.len() + num_chunks - 1) / num_chunks;
115
116    let results: Vec<_> = items.chunks(chunk_size)
117        .map(|chunk| {
118            if depth > 1 {
119                hierarchical_map(chunk, depth - 1, f.clone())
120            } else {
121                parallel_map(chunk, &f)
122            }
123        })
124        .collect();
125
126    results.into_iter().flatten().collect()
127}
128
129/// Cache-aware parallel execution
130pub fn cache_aware_map<T, R, F>(items: &[T], f: F) -> Vec<R>
131where
132    T: Sync,
133    R: Send + 'static,
134    F: Fn(&T) -> R + Send + Sync,
135{
136    // Use optimal chunk size based on cache lines
137    use crate::executor::parallel_map;
138    parallel_map(items, f)
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144
145    #[test]
146    fn test_adaptive_executor() {
147        let executor = AdaptiveExecutor::new();
148        let data: Vec<i32> = (1..=1000).collect();
149        let results = executor.adaptive_map(&data, |x| x * 2);
150        assert_eq!(results.len(), 1000);
151    }
152
153    #[test]
154    fn test_speculative_execute() {
155        let data: Vec<i32> = vec![1, 2, 3, 4, 5];
156        let results = speculative_execute(&data, |x| x * 2);
157        assert_eq!(results, vec![2, 4, 6, 8, 10]);
158    }
159
160    #[test]
161    fn test_hierarchical_map() {
162        let data: Vec<i32> = (1..=100).collect();
163        let results = hierarchical_map(&data, 2, |x| x * 2);
164        assert_eq!(results.len(), 100);
165    }
166
167    #[test]
168    fn test_cache_aware_map() {
169        let data: Vec<i32> = (1..=100).collect();
170        let results = cache_aware_map(&data, |x| x * 2);
171        assert_eq!(results.len(), 100);
172    }
173}