rust_prelude_plus/
parallel.rs

1//! Parallel operations for keypath-based functional programming
2
3#[cfg(feature = "parallel")]
4use {
5    rayon::prelude::*,
6    key_paths_core::KeyPaths,
7    crate::error::{KeyPathResult, KeyPathError},
8    crate::traits::KeyPathsOperable,
9};
10
11#[cfg(feature = "parallel")]
12/// Parallel keypath operations for collections
13pub mod parallel_collections {
14    use super::*;
15    
16    /// Parallel map over collection with keypath
17    pub fn par_map_keypath<T, V, F, R>(
18        collection: Vec<T>,
19        keypath: KeyPaths<T, V>,
20        f: F,
21    ) -> KeyPathResult<Vec<R>>
22    where
23        T: Send + Sync + KeyPathsOperable,
24        V: Send + Sync,
25        KeyPaths<T, V>: Send + Sync,
26        F: Fn(&V) -> R + Send + Sync,
27        R: Send,
28    {
29        let result: Vec<R> = collection
30            .into_par_iter()
31            .map(|item| {
32                let value = item.get_at_keypath(&keypath).unwrap_or_else(|_| {
33                    panic!("KeyPath access failed in par_map_keypath")
34                });
35                f(value)
36            })
37            .collect();
38        Ok(result)
39    }
40    
41    /// Parallel filter by keypath predicate
42    pub fn par_filter_by_keypath<T, V, F>(
43        collection: Vec<T>,
44        keypath: KeyPaths<T, V>,
45        predicate: F,
46    ) -> KeyPathResult<Vec<T>>
47    where
48        T: Send + Sync + KeyPathsOperable,
49        V: Send + Sync,
50        KeyPaths<T, V>: Send + Sync,
51        F: Fn(&V) -> bool + Send + Sync,
52    {
53        let result: Vec<T> = collection
54            .into_par_iter()
55            .filter(|item| {
56                let value = item.get_at_keypath(&keypath).unwrap_or_else(|_| {
57                    panic!("KeyPath access failed in par_filter_by_keypath")
58                });
59                predicate(value)
60            })
61            .collect();
62        Ok(result)
63    }
64    
65    /// Parallel find by keypath predicate
66    pub fn par_find_by_keypath<T, V, F>(
67        collection: Vec<T>,
68        keypath: KeyPaths<T, V>,
69        predicate: F,
70    ) -> KeyPathResult<Option<T>>
71    where
72        T: Send + Sync + KeyPathsOperable,
73        V: Send + Sync,
74        KeyPaths<T, V>: Send + Sync,
75        F: Fn(&V) -> bool + Send + Sync,
76    {
77        let result = collection
78            .into_par_iter()
79            .find_any(|item| {
80                let value = item.get_at_keypath(&keypath).unwrap_or_else(|_| {
81                    panic!("KeyPath access failed in par_find_by_keypath")
82                });
83                predicate(value)
84            });
85        Ok(result)
86    }
87    
88    /// Parallel collect keypath values
89    pub fn par_collect_keypath<T, V>(
90        collection: Vec<T>,
91        keypath: KeyPaths<T, V>,
92    ) -> KeyPathResult<Vec<V>>
93    where
94        T: Send + Sync + KeyPathsOperable,
95        V: Send + Sync + Clone,
96        KeyPaths<T, V>: Send + Sync,
97    {
98        let result: Vec<V> = collection
99            .into_par_iter()
100            .map(|item| {
101                let value = item.get_at_keypath(&keypath).unwrap_or_else(|_| {
102                    panic!("KeyPath access failed in par_collect_keypath")
103                });
104                value.clone()
105            })
106            .collect();
107        Ok(result)
108    }
109    
110    /// Parallel count by keypath predicate
111    pub fn par_count_by_keypath<T, V, F>(
112        collection: Vec<T>,
113        keypath: KeyPaths<T, V>,
114        predicate: F,
115    ) -> KeyPathResult<usize>
116    where
117        T: Send + Sync + KeyPathsOperable,
118        V: Send + Sync,
119        KeyPaths<T, V>: Send + Sync,
120        F: Fn(&V) -> bool + Send + Sync,
121    {
122        let count = collection
123            .into_par_iter()
124            .filter(|item| {
125                let value = item.get_at_keypath(&keypath).unwrap_or_else(|_| {
126                    panic!("KeyPath access failed in par_count_by_keypath")
127                });
128                predicate(value)
129            })
130            .count();
131        Ok(count)
132    }
133    
134    /// Parallel any by keypath predicate
135    pub fn par_any_by_keypath<T, V, F>(
136        collection: Vec<T>,
137        keypath: KeyPaths<T, V>,
138        predicate: F,
139    ) -> KeyPathResult<bool>
140    where
141        T: Send + Sync + KeyPathsOperable,
142        V: Send + Sync,
143        KeyPaths<T, V>: Send + Sync,
144        F: Fn(&V) -> bool + Send + Sync,
145    {
146        let result = collection
147            .into_par_iter()
148            .any(|item| {
149                let value = item.get_at_keypath(&keypath).unwrap_or_else(|_| {
150                    panic!("KeyPath access failed in par_any_by_keypath")
151                });
152                predicate(value)
153            });
154        Ok(result)
155    }
156    
157    /// Parallel all by keypath predicate
158    pub fn par_all_by_keypath<T, V, F>(
159        collection: Vec<T>,
160        keypath: KeyPaths<T, V>,
161        predicate: F,
162    ) -> KeyPathResult<bool>
163    where
164        T: Send + Sync + KeyPathsOperable,
165        V: Send + Sync,
166        KeyPaths<T, V>: Send + Sync,
167        F: Fn(&V) -> bool + Send + Sync,
168    {
169        let result = collection
170            .into_par_iter()
171            .all(|item| {
172                let value = item.get_at_keypath(&keypath).unwrap_or_else(|_| {
173                    panic!("KeyPath access failed in par_all_by_keypath")
174                });
175                predicate(value)
176            });
177        Ok(result)
178    }
179}
180
181#[cfg(feature = "parallel")]
182/// Parallel keypath operations with custom thread pools
183pub mod parallel_pools {
184    use super::*;
185    use rayon::{ThreadPool, ThreadPoolBuilder};
186    
187    /// Create a custom thread pool for keypath operations
188    pub fn create_keypath_thread_pool(num_threads: usize) -> Result<ThreadPool, KeyPathError> {
189        ThreadPoolBuilder::new()
190            .num_threads(num_threads)
191            .build()
192            .map_err(|e| KeyPathError::ParallelError {
193                message: format!("Failed to create thread pool: {}", e),
194            })
195    }
196    
197    /// Execute keypath operations on custom thread pool
198    pub fn execute_on_pool<T, V, F, R>(
199        pool: &ThreadPool,
200        collection: Vec<T>,
201        keypath: KeyPaths<T, V>,
202        operation: F,
203    ) -> KeyPathResult<Vec<R>>
204    where
205        T: Send + Sync + KeyPathsOperable,
206        V: Send + Sync,
207        KeyPaths<T, V>: Send + Sync,
208        F: Fn(&V) -> R + Send + Sync,
209        R: Send,
210    {
211        pool.install(|| {
212            parallel_collections::par_map_keypath(collection, keypath, operation)
213        })
214    }
215    
216    /// Execute keypath filter on custom thread pool
217    pub fn filter_on_pool<T, V, F>(
218        pool: &ThreadPool,
219        collection: Vec<T>,
220        keypath: KeyPaths<T, V>,
221        predicate: F,
222    ) -> KeyPathResult<Vec<T>>
223    where
224        T: Send + Sync + KeyPathsOperable,
225        V: Send + Sync,
226        KeyPaths<T, V>: Send + Sync,
227        F: Fn(&V) -> bool + Send + Sync,
228    {
229        pool.install(|| {
230            parallel_collections::par_filter_by_keypath(collection, keypath, predicate)
231        })
232    }
233}