1#[cfg(feature = "parallel")]
4use {
5 rayon::prelude::*,
6 key_paths_core::KeyPaths,
7 crate::error::{KeyPathResult, KeyPathError},
8 crate::traits::KeyPathsOperable,
9};
10
11#[cfg(feature = "parallel")]
12pub mod parallel_collections {
14 use super::*;
15
16 pub fn par_map_keypath<T, V, F, R>(
18 collection: Vec<T>,
19 keypath: KeyPaths<T, V>,
20 f: F,
21 ) -> KeyPathResult<Vec<R>>
22 where
23 T: Send + Sync + KeyPathsOperable,
24 V: Send + Sync,
25 KeyPaths<T, V>: Send + Sync,
26 F: Fn(&V) -> R + Send + Sync,
27 R: Send,
28 {
29 let result: Vec<R> = collection
30 .into_par_iter()
31 .map(|item| {
32 let value = item.get_at_keypath(&keypath).unwrap_or_else(|_| {
33 panic!("KeyPath access failed in par_map_keypath")
34 });
35 f(value)
36 })
37 .collect();
38 Ok(result)
39 }
40
41 pub fn par_filter_by_keypath<T, V, F>(
43 collection: Vec<T>,
44 keypath: KeyPaths<T, V>,
45 predicate: F,
46 ) -> KeyPathResult<Vec<T>>
47 where
48 T: Send + Sync + KeyPathsOperable,
49 V: Send + Sync,
50 KeyPaths<T, V>: Send + Sync,
51 F: Fn(&V) -> bool + Send + Sync,
52 {
53 let result: Vec<T> = collection
54 .into_par_iter()
55 .filter(|item| {
56 let value = item.get_at_keypath(&keypath).unwrap_or_else(|_| {
57 panic!("KeyPath access failed in par_filter_by_keypath")
58 });
59 predicate(value)
60 })
61 .collect();
62 Ok(result)
63 }
64
65 pub fn par_find_by_keypath<T, V, F>(
67 collection: Vec<T>,
68 keypath: KeyPaths<T, V>,
69 predicate: F,
70 ) -> KeyPathResult<Option<T>>
71 where
72 T: Send + Sync + KeyPathsOperable,
73 V: Send + Sync,
74 KeyPaths<T, V>: Send + Sync,
75 F: Fn(&V) -> bool + Send + Sync,
76 {
77 let result = collection
78 .into_par_iter()
79 .find_any(|item| {
80 let value = item.get_at_keypath(&keypath).unwrap_or_else(|_| {
81 panic!("KeyPath access failed in par_find_by_keypath")
82 });
83 predicate(value)
84 });
85 Ok(result)
86 }
87
88 pub fn par_collect_keypath<T, V>(
90 collection: Vec<T>,
91 keypath: KeyPaths<T, V>,
92 ) -> KeyPathResult<Vec<V>>
93 where
94 T: Send + Sync + KeyPathsOperable,
95 V: Send + Sync + Clone,
96 KeyPaths<T, V>: Send + Sync,
97 {
98 let result: Vec<V> = collection
99 .into_par_iter()
100 .map(|item| {
101 let value = item.get_at_keypath(&keypath).unwrap_or_else(|_| {
102 panic!("KeyPath access failed in par_collect_keypath")
103 });
104 value.clone()
105 })
106 .collect();
107 Ok(result)
108 }
109
110 pub fn par_count_by_keypath<T, V, F>(
112 collection: Vec<T>,
113 keypath: KeyPaths<T, V>,
114 predicate: F,
115 ) -> KeyPathResult<usize>
116 where
117 T: Send + Sync + KeyPathsOperable,
118 V: Send + Sync,
119 KeyPaths<T, V>: Send + Sync,
120 F: Fn(&V) -> bool + Send + Sync,
121 {
122 let count = collection
123 .into_par_iter()
124 .filter(|item| {
125 let value = item.get_at_keypath(&keypath).unwrap_or_else(|_| {
126 panic!("KeyPath access failed in par_count_by_keypath")
127 });
128 predicate(value)
129 })
130 .count();
131 Ok(count)
132 }
133
134 pub fn par_any_by_keypath<T, V, F>(
136 collection: Vec<T>,
137 keypath: KeyPaths<T, V>,
138 predicate: F,
139 ) -> KeyPathResult<bool>
140 where
141 T: Send + Sync + KeyPathsOperable,
142 V: Send + Sync,
143 KeyPaths<T, V>: Send + Sync,
144 F: Fn(&V) -> bool + Send + Sync,
145 {
146 let result = collection
147 .into_par_iter()
148 .any(|item| {
149 let value = item.get_at_keypath(&keypath).unwrap_or_else(|_| {
150 panic!("KeyPath access failed in par_any_by_keypath")
151 });
152 predicate(value)
153 });
154 Ok(result)
155 }
156
157 pub fn par_all_by_keypath<T, V, F>(
159 collection: Vec<T>,
160 keypath: KeyPaths<T, V>,
161 predicate: F,
162 ) -> KeyPathResult<bool>
163 where
164 T: Send + Sync + KeyPathsOperable,
165 V: Send + Sync,
166 KeyPaths<T, V>: Send + Sync,
167 F: Fn(&V) -> bool + Send + Sync,
168 {
169 let result = collection
170 .into_par_iter()
171 .all(|item| {
172 let value = item.get_at_keypath(&keypath).unwrap_or_else(|_| {
173 panic!("KeyPath access failed in par_all_by_keypath")
174 });
175 predicate(value)
176 });
177 Ok(result)
178 }
179}
180
181#[cfg(feature = "parallel")]
182pub mod parallel_pools {
184 use super::*;
185 use rayon::{ThreadPool, ThreadPoolBuilder};
186
187 pub fn create_keypath_thread_pool(num_threads: usize) -> Result<ThreadPool, KeyPathError> {
189 ThreadPoolBuilder::new()
190 .num_threads(num_threads)
191 .build()
192 .map_err(|e| KeyPathError::ParallelError {
193 message: format!("Failed to create thread pool: {}", e),
194 })
195 }
196
197 pub fn execute_on_pool<T, V, F, R>(
199 pool: &ThreadPool,
200 collection: Vec<T>,
201 keypath: KeyPaths<T, V>,
202 operation: F,
203 ) -> KeyPathResult<Vec<R>>
204 where
205 T: Send + Sync + KeyPathsOperable,
206 V: Send + Sync,
207 KeyPaths<T, V>: Send + Sync,
208 F: Fn(&V) -> R + Send + Sync,
209 R: Send,
210 {
211 pool.install(|| {
212 parallel_collections::par_map_keypath(collection, keypath, operation)
213 })
214 }
215
216 pub fn filter_on_pool<T, V, F>(
218 pool: &ThreadPool,
219 collection: Vec<T>,
220 keypath: KeyPaths<T, V>,
221 predicate: F,
222 ) -> KeyPathResult<Vec<T>>
223 where
224 T: Send + Sync + KeyPathsOperable,
225 V: Send + Sync,
226 KeyPaths<T, V>: Send + Sync,
227 F: Fn(&V) -> bool + Send + Sync,
228 {
229 pool.install(|| {
230 parallel_collections::par_filter_by_keypath(collection, keypath, predicate)
231 })
232 }
233}