lattice_qcd_rs/thread.rs
1//! Tool for easy use of multi threading.
2
3use std::{
4 any::Any,
5 collections::HashMap,
6 error::Error,
7 fmt::{self, Display, Formatter},
8 hash::Hash,
9 iter::Iterator,
10 sync::{mpsc, Arc, Mutex},
11 vec::Vec,
12};
13
14use crossbeam::thread;
15use rayon::iter::IntoParallelIterator;
16use rayon::prelude::ParallelIterator;
17#[cfg(feature = "serde-serialize")]
18use serde::{Deserialize, Serialize};
19
20use super::lattice::{LatticeCyclic, LatticeElementToIndex};
21
22/// Multithreading error.
23///
24/// This can be converted to [`ThreadError`] which is more convenient to use keeping only the case
25/// with [`String`] and [`&str`] messages.
26#[derive(Debug)]
27#[non_exhaustive]
28pub enum ThreadAnyError {
29 /// Tried to run some jobs with 0 threads
30 ThreadNumberIncorrect,
31 /// One or more of the threads panicked. Inside the [`Box`] is the panic message.
32 /// see [`run_pool_parallel`] example.
33 Panic(Vec<Box<dyn Any + Send + 'static>>),
34}
35
36impl Display for ThreadAnyError {
37 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
38 match self {
39 Self::ThreadNumberIncorrect => write!(f, "number of thread is incorrect"),
40 Self::Panic(any) => {
41 let n = any.len();
42 if n == 0 {
43 write!(f, "0 thread panicked")?;
44 }
45 else if n == 1 {
46 write!(f, "a thread panicked with")?;
47 }
48 else {
49 write!(f, "{} threads panicked with [", n)?;
50 }
51
52 for (index, element_any) in any.iter().enumerate() {
53 if let Some(string) = element_any.downcast_ref::<String>() {
54 write!(f, "\"{}\"", string)?;
55 }
56 else if let Some(string) = element_any.downcast_ref::<&str>() {
57 write!(f, "\"{}\"", string)?;
58 }
59 else {
60 write!(f, "{:?}", element_any)?;
61 }
62
63 if index < any.len() - 1 {
64 write!(f, " ,")?;
65 }
66 else if n > 1 {
67 write!(f, "]")?;
68 }
69 }
70
71 Ok(())
72 }
73 }
74 }
75}
76
77impl Error for ThreadAnyError {}
78
79/// Multithreading error with a string panic message.
80///
81/// It is more convenient to use compared to [`ThreadAnyError`] and can be converted from it.
82/// It convert message of type [`String`] and [`&str`] otherwise set it to None.
83#[derive(Debug, Clone, Hash, PartialEq, Eq)]
84#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
85#[non_exhaustive]
86pub enum ThreadError {
87 /// Tried to run some jobs with 0 threads
88 ThreadNumberIncorrect,
89 /// One of the thread panicked with the given messages.
90 /// see [`run_pool_parallel`] example.
91 Panic(Vec<Option<String>>),
92}
93
94impl Display for ThreadError {
95 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
96 match self {
97 Self::ThreadNumberIncorrect => write!(f, "number of thread is incorrect"),
98 Self::Panic(strings) => {
99 let n = strings.len();
100 if n == 0 {
101 // this should not be used but it is possible to create an instance with an empty vec.
102 write!(f, "0 thread panicked")?;
103 }
104 else if n == 1 {
105 write!(f, "a thread panicked with")?;
106 }
107 else {
108 write!(f, "{} threads panicked with [", n)?;
109 }
110
111 for (index, string) in strings.iter().enumerate() {
112 if let Some(string) = string {
113 write!(f, "\"{}\"", string)?;
114 }
115 else {
116 write!(f, "None")?;
117 }
118
119 if index < strings.len() - 1 {
120 write!(f, " ,")?;
121 }
122 else if n > 1 {
123 write!(f, "]")?;
124 }
125 }
126
127 Ok(())
128 }
129 }
130 }
131}
132
133impl Error for ThreadError {}
134
135impl From<ThreadAnyError> for ThreadError {
136 #[allow(clippy::manual_map)] // clarity / false positive ?
137 fn from(f: ThreadAnyError) -> Self {
138 match f {
139 ThreadAnyError::ThreadNumberIncorrect => Self::ThreadNumberIncorrect,
140 ThreadAnyError::Panic(any) => Self::Panic(
141 any.iter()
142 .map(|element| {
143 if let Some(string) = element.downcast_ref::<String>() {
144 Some(string.clone())
145 }
146 else if let Some(string) = element.downcast_ref::<&str>() {
147 Some(string.to_string())
148 }
149 else {
150 None
151 }
152 })
153 .collect(),
154 ),
155 }
156 }
157}
158
159impl From<ThreadError> for ThreadAnyError {
160 fn from(f: ThreadError) -> Self {
161 match f {
162 ThreadError::ThreadNumberIncorrect => Self::ThreadNumberIncorrect,
163 ThreadError::Panic(strings) => Self::Panic(
164 strings
165 .iter()
166 .map(|string| -> Box<dyn Any + Send + 'static> {
167 if let Some(string) = string {
168 Box::new(string.clone())
169 }
170 else {
171 Box::new("".to_string())
172 }
173 })
174 .collect(),
175 ),
176 }
177 }
178}
179
180/// run jobs in parallel.
181///
182/// The pool of job is given by `iter`. the job is given by `closure` that have the form `|key,common_data| -> Data`.
183/// `number_of_thread` determine the number of job done in parallel and should be greater than 0,
184/// otherwise return [`ThreadAnyError::ThreadNumberIncorrect`].
185/// `capacity` is used to determine the capacity of the [`HashMap`] upon initiation (see [`HashMap::with_capacity`])
186///
187/// # Errors
188/// Returns [`ThreadAnyError::ThreadNumberIncorrect`] is the number of threads is 0.
189/// Returns [`ThreadAnyError::Panic`] if a thread panicked. Contains the panic message.
190///
191/// # Example
192/// let us computes the value of `i^2 * c` for i in \[2,9999\] with 4 threads
193/// ```
194/// # use lattice_qcd_rs::thread::run_pool_parallel;
195/// # use std::error::Error;
196///
197/// # fn main() -> Result<(), Box<dyn Error>> {
198/// let iter = 2..10000;
199/// let c = 5;
200/// // we could have put 4 inside the closure but this demonstrate how to use common data
201/// let result = run_pool_parallel(iter, &c, &|i, c| i * i * c, 4, 10000 - 2)?;
202/// assert_eq!(*result.get(&40).unwrap(), 40 * 40 * c);
203/// assert_eq!(result.get(&1), None);
204/// # Ok(())
205/// # }
206/// ```
207/// In the next example a thread will panic, we demonstrate the return type.
208/// ```should_panic
209/// # use lattice_qcd_rs::thread::{run_pool_parallel, ThreadAnyError};
210/// let iter = 0..10;
211/// let result = run_pool_parallel(iter, &(), &|_, _| panic!("{}", "panic message"), 4, 10);
212/// match result {
213/// Ok(_) => {}
214/// Err(err) => panic!("{}", err),
215/// }
216/// ```
217/// This give the following panic message
218/// ```textrust
219/// stderr:
220/// thread '<unnamed>' panicked at 'panic message', src\thread.rs:6:51
221/// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
222/// thread '<unnamed>' panicked at 'panic message', src\thread.rs:6:51
223/// thread '<unnamed>' panicked at 'panic message', src\thread.rs:6:51
224/// thread '<unnamed>' panicked at 'panic message', src\thread.rs:6:51
225/// thread 'main' panicked at '4 threads panicked with ["panic message" ,"panic message" ,"panic message" ,"panic message"]', src\thread.rs:9:17
226/// ```
227pub fn run_pool_parallel<Key, Data, CommonData, F>(
228 iter: impl Iterator<Item = Key> + Send,
229 common_data: &CommonData,
230 closure: &F,
231 number_of_thread: usize,
232 capacity: usize,
233) -> Result<HashMap<Key, Data>, ThreadAnyError>
234where
235 CommonData: Sync,
236 Key: Eq + Hash + Send + Clone + Sync,
237 Data: Send,
238 F: Sync + Clone + Fn(&Key, &CommonData) -> Data,
239{
240 run_pool_parallel_with_initializations_mutable(
241 iter,
242 common_data,
243 &|_, key, common| closure(key, common),
244 &|| (),
245 number_of_thread,
246 capacity,
247 )
248}
249
250/// run jobs in parallel. Similar to [`run_pool_parallel`] but with initiation.
251///
252/// see [`run_pool_parallel`]. Moreover let some data to be initialize per thread.
253/// closure_init is run once per thread and store inside a mutable data which closure can modify.
254///
255/// # Errors
256/// Returns [`ThreadAnyError::ThreadNumberIncorrect`] is the number of threads is 0.
257/// Returns [`ThreadAnyError::Panic`] if a thread panicked. Contains the panic message.
258///
259/// # Examples
260/// Let us create some value but we will greet the user from the threads
261/// ```
262/// # use lattice_qcd_rs::thread::run_pool_parallel_with_initializations_mutable;
263/// # use std::error::Error;
264///
265/// # fn main() -> Result<(), Box<dyn Error>> {
266/// let iter = 0_u128..100000_u128;
267/// let c = 5_u128;
268/// // we could have put 4 inside the closure but this demonstrate how to use common data
269/// let result = run_pool_parallel_with_initializations_mutable(
270/// iter,
271/// &c,
272/// &|has_greeted: &mut bool, i, c| {
273/// if !*has_greeted {
274/// *has_greeted = true;
275/// println!("Hello from the thread");
276/// }
277/// i * i * c
278/// },
279/// || false,
280/// 4,
281/// 100000,
282/// )?;
283/// # Ok(())
284/// # }
285/// ```
286/// will print "Hello from the thread" four times.
287///
288/// Another useful application is to use an rng
289/// ```
290/// extern crate rand;
291/// extern crate rand_distr;
292/// use lattice_qcd_rs::field::Su3Adjoint;
293/// use lattice_qcd_rs::lattice::LatticeCyclic;
294/// use lattice_qcd_rs::thread::run_pool_parallel_with_initializations_mutable;
295/// # use std::error::Error;
296///
297/// # fn main() -> Result<(), Box<dyn Error>> {
298/// let l = LatticeCyclic::<4>::new(1_f64, 4)?;
299/// let distribution = rand::distributions::Uniform::from(-1_f64..1_f64);
300/// let result = run_pool_parallel_with_initializations_mutable(
301/// l.get_links(),
302/// &distribution,
303/// &|rng, _, d| Su3Adjoint::random(rng, d).to_su3(),
304/// rand::thread_rng,
305/// 4,
306/// l.number_of_canonical_links_space(),
307/// )?;
308/// # Ok(())
309/// # }
310/// ```
311#[allow(clippy::needless_return)] // for readability
312#[allow(clippy::semicolon_if_nothing_returned)] // I actually want to returns a never in the future
313pub fn run_pool_parallel_with_initializations_mutable<Key, Data, CommonData, InitData, F, FInit>(
314 iter: impl Iterator<Item = Key> + Send,
315 common_data: &CommonData,
316 closure: &F,
317 closure_init: FInit,
318 number_of_thread: usize,
319 capacity: usize,
320) -> Result<HashMap<Key, Data>, ThreadAnyError>
321where
322 CommonData: Sync,
323 Key: Eq + Hash + Send + Clone + Sync,
324 Data: Send,
325 F: Sync + Clone + Fn(&mut InitData, &Key, &CommonData) -> Data,
326 FInit: Send + Clone + FnOnce() -> InitData,
327{
328 if number_of_thread == 0 {
329 return Err(ThreadAnyError::ThreadNumberIncorrect);
330 }
331 else if number_of_thread == 1 {
332 let mut hash_map = HashMap::<Key, Data>::with_capacity(capacity);
333 let mut init_data = closure_init();
334 for i in iter {
335 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
336 hash_map.insert(i.clone(), closure(&mut init_data, &i, common_data))
337 }))
338 .map_err(|err| ThreadAnyError::Panic(vec![err]))?;
339 }
340 return Ok(hash_map);
341 }
342 else {
343 let result = thread::scope(|s| {
344 let mutex_iter = Arc::new(Mutex::new(iter));
345 let mut threads = Vec::with_capacity(number_of_thread);
346 let (result_tx, result_rx) = mpsc::channel::<(Key, Data)>();
347 for _ in 0..number_of_thread {
348 let iter_clone = Arc::clone(&mutex_iter);
349 let transmitter = result_tx.clone();
350 let closure_init_clone = closure_init.clone();
351 let handel = s.spawn(move |_| {
352 let mut init_data = closure_init_clone();
353 loop {
354 let val = iter_clone.lock().unwrap().next();
355 match val {
356 Some(i) => transmitter
357 .send((i.clone(), closure(&mut init_data, &i, common_data)))
358 .unwrap(),
359 None => break,
360 }
361 }
362 });
363 threads.push(handel);
364 }
365 // we drop channel so we can properly assert if they are closed
366 drop(result_tx);
367 let mut hash_map = HashMap::<Key, Data>::with_capacity(capacity);
368 for message in result_rx {
369 let (key, data) = message;
370 hash_map.insert(key, data);
371 }
372
373 let panics = threads
374 .into_iter()
375 .map(|handel| handel.join())
376 .filter_map(|res| res.err())
377 .collect::<Vec<_>>();
378 if !panics.is_empty() {
379 return Err(ThreadAnyError::Panic(panics));
380 }
381
382 Ok(hash_map)
383 })
384 .unwrap_or_else(|err| {
385 if err
386 .downcast_ref::<Vec<Box<dyn Any + 'static + Send>>>()
387 .is_some()
388 {
389 unreachable!("a failing handle is not joined")
390 }
391 unreachable!("main thread panicked")
392 });
393 return result;
394 }
395}
396
397/// run jobs in parallel. Similar to [`run_pool_parallel`] but return a vector.
398///
399/// Now a reference to the lattice must be given and `key` must implement the trait
400/// [`super::lattice::LatticeElementToIndex`].
401/// [`super::lattice::LatticeElementToIndex::to_index`] will be use to insert the data inside the vector.
402/// While computing because the thread can operate out of order, fill the data not yet computed by `default_data`
403/// `capacity` is used to determine the capacity of the [`std::vec::Vec`] upon initiation
404/// (see [`std::vec::Vec::with_capacity`]).
405///
406/// # Errors
407/// Returns [`ThreadAnyError::ThreadNumberIncorrect`] is the number of threads is 0.
408/// Returns [`ThreadAnyError::Panic`] if a thread panicked. Contains the panic message.
409///
410/// # Example
411/// ```
412/// use lattice_qcd_rs::field::Su3Adjoint;
413/// use lattice_qcd_rs::lattice::{LatticeCyclic, LatticeElementToIndex, LatticePoint};
414/// use lattice_qcd_rs::thread::run_pool_parallel_vec;
415/// # use std::error::Error;
416///
417/// # fn main() -> Result<(), Box<dyn Error>> {
418/// let l = LatticeCyclic::<4>::new(1_f64, 4)?;
419/// let c = 5_usize;
420/// let result = run_pool_parallel_vec(
421/// l.get_points(),
422/// &c,
423/// &|i: &LatticePoint<4>, c: &usize| i[0] * c,
424/// 4,
425/// l.number_of_canonical_links_space(),
426/// &l,
427/// &0,
428/// )?;
429/// let point = LatticePoint::new([3, 0, 5, 0].into());
430/// assert_eq!(result[point.to_index(&l)], point[0] * c);
431/// # Ok(())
432/// # }
433/// ```
434pub fn run_pool_parallel_vec<Key, Data, CommonData, F, const D: usize>(
435 iter: impl Iterator<Item = Key> + Send,
436 common_data: &CommonData,
437 closure: &F,
438 number_of_thread: usize,
439 capacity: usize,
440 l: &LatticeCyclic<D>,
441 default_data: &Data,
442) -> Result<Vec<Data>, ThreadAnyError>
443where
444 CommonData: Sync,
445 Key: Eq + Send + Clone + Sync + LatticeElementToIndex<D>,
446 Data: Send + Clone,
447 F: Sync + Clone + Fn(&Key, &CommonData) -> Data,
448{
449 run_pool_parallel_vec_with_initializations_mutable(
450 iter,
451 common_data,
452 &|_, key, common| closure(key, common),
453 &|| (),
454 number_of_thread,
455 capacity,
456 l,
457 default_data,
458 )
459}
460
461// TODO convert closure for conversion key -> usize
462
463/// run jobs in parallel. Similar to [`run_pool_parallel_vec`] but with initiation.
464///
465/// # Errors
466/// Returns [`ThreadAnyError::ThreadNumberIncorrect`] is the number of threads is 0.
467/// Returns [`ThreadAnyError::Panic`] if a thread panicked. Contains the panic message.
468///
469/// # Examples
470/// Let us create some value but we will greet the user from the threads
471/// ```
472/// use lattice_qcd_rs::lattice::{LatticeCyclic, LatticeElementToIndex, LatticePoint};
473/// use lattice_qcd_rs::thread::run_pool_parallel_vec_with_initializations_mutable;
474/// # use std::error::Error;
475///
476/// # fn main() -> Result<(), Box<dyn Error>> {
477/// let l = LatticeCyclic::<4>::new(1_f64, 25)?;
478/// let iter = l.get_points();
479/// let c = 5_usize;
480/// // we could have put 4 inside the closure but this demonstrate how to use common data
481/// let result = run_pool_parallel_vec_with_initializations_mutable(
482/// iter,
483/// &c,
484/// &|has_greeted: &mut bool, i: &LatticePoint<4>, c: &usize| {
485/// if !*has_greeted {
486/// *has_greeted = true;
487/// println!("Hello from the thread");
488/// }
489/// i[0] * c
490/// },
491/// || false,
492/// 4,
493/// 100000,
494/// &l,
495/// &0,
496/// )?;
497/// # Ok(())
498/// # }
499/// ```
500/// will print "Hello from the thread" four times.
501///
502/// Another useful application is to use an rng
503/// ```
504/// extern crate rand;
505/// extern crate rand_distr;
506/// extern crate nalgebra;
507/// use lattice_qcd_rs::field::Su3Adjoint;
508/// use lattice_qcd_rs::lattice::LatticeCyclic;
509/// use lattice_qcd_rs::thread::run_pool_parallel_vec_with_initializations_mutable;
510/// # use std::error::Error;
511///
512/// # fn main() -> Result<(), Box<dyn Error>> {
513/// let l = LatticeCyclic::<4>::new(1_f64, 4)?;
514/// let distribution = rand::distributions::Uniform::from(-1_f64..1_f64);
515/// let result = run_pool_parallel_vec_with_initializations_mutable(
516/// l.get_links(),
517/// &distribution,
518/// &|rng, _, d| Su3Adjoint::random(rng, d).to_su3(),
519/// rand::thread_rng,
520/// 4,
521/// l.number_of_canonical_links_space(),
522/// &l,
523/// &nalgebra::Matrix3::<nalgebra::Complex<f64>>::zeros(),
524/// )?;
525/// # Ok(())
526/// # }
527/// ```
528#[allow(clippy::too_many_arguments)]
529#[allow(clippy::needless_return)] // for readability
530#[allow(clippy::semicolon_if_nothing_returned)] // I actually want to return a never in the future
531pub fn run_pool_parallel_vec_with_initializations_mutable<
532 Key,
533 Data,
534 CommonData,
535 InitData,
536 F,
537 FInit,
538 const D: usize,
539>(
540 iter: impl Iterator<Item = Key> + Send,
541 common_data: &CommonData,
542 closure: &F,
543 closure_init: FInit,
544 number_of_thread: usize,
545 capacity: usize,
546 l: &LatticeCyclic<D>,
547 default_data: &Data,
548) -> Result<Vec<Data>, ThreadAnyError>
549where
550 CommonData: Sync,
551 Key: Eq + Send + Clone + Sync,
552 Data: Send + Clone,
553 F: Sync + Clone + Fn(&mut InitData, &Key, &CommonData) -> Data,
554 FInit: Send + Clone + FnOnce() -> InitData,
555 Key: LatticeElementToIndex<D>,
556{
557 if number_of_thread == 0 {
558 return Err(ThreadAnyError::ThreadNumberIncorrect);
559 }
560 else if number_of_thread == 1 {
561 let mut vec = Vec::<Data>::with_capacity(capacity);
562 let mut init_data = closure_init();
563 for i in iter {
564 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
565 insert_in_vec(
566 &mut vec,
567 i.clone().to_index(l),
568 closure(&mut init_data, &i, common_data),
569 default_data,
570 );
571 }))
572 .map_err(|err| ThreadAnyError::Panic(vec![err]))?;
573 }
574 return Ok(vec);
575 }
576 else {
577 let result = thread::scope(|s| {
578 // I try to put the thread creation in a function but the life time annotation were a mess.
579 // I did not manage to make it working.
580 let mutex_iter = Arc::new(Mutex::new(iter));
581 let mut threads = Vec::with_capacity(number_of_thread);
582 let (result_tx, result_rx) = mpsc::channel::<(Key, Data)>();
583 for _ in 0..number_of_thread {
584 let iter_clone = Arc::clone(&mutex_iter);
585 let transmitter = result_tx.clone();
586 let closure_init_clone = closure_init.clone();
587 let handel = s.spawn(move |_| {
588 let mut init_data = closure_init_clone();
589 loop {
590 let val = iter_clone.lock().unwrap().next();
591 match val {
592 Some(i) => transmitter
593 .send((i.clone(), closure(&mut init_data, &i, common_data)))
594 .unwrap(),
595 None => break,
596 }
597 }
598 });
599 threads.push(handel);
600 }
601 // we drop channel so we can properly assert if they are closed
602 drop(result_tx);
603 let mut vec = Vec::<Data>::with_capacity(capacity);
604 for message in result_rx {
605 let (key, data) = message;
606 insert_in_vec(&mut vec, key.to_index(l), data, default_data);
607 }
608
609 let panics = threads
610 .into_iter()
611 .map(|handel| handel.join())
612 .filter_map(|res| res.err())
613 .collect::<Vec<_>>();
614 if !panics.is_empty() {
615 return Err(ThreadAnyError::Panic(panics));
616 }
617
618 Ok(vec)
619 })
620 .unwrap_or_else(|err| {
621 if err
622 .downcast_ref::<Vec<Box<dyn Any + 'static + Send>>>()
623 .is_some()
624 {
625 unreachable!("a failing handle is not joined")
626 }
627 unreachable!("main thread panicked")
628 });
629 return result;
630 }
631}
632
633/// Try setting the value inside the vec at position `pos`. If the position is not the array,
634/// build the array with default value up to `pos - 1` and insert data at `pos`.
635///
636/// # Example
637/// ```
638/// # use lattice_qcd_rs::thread::insert_in_vec;
639/// use std::vec::Vec;
640///
641/// let mut vec = vec![];
642/// insert_in_vec(&mut vec, 0, 1, &0);
643/// assert_eq!(vec, vec![1]);
644/// insert_in_vec(&mut vec, 3, 9, &0);
645/// assert_eq!(vec, vec![1, 0, 0, 9]);
646/// insert_in_vec(&mut vec, 5, 10, &1);
647/// assert_eq!(vec, vec![1, 0, 0, 9, 1, 10]);
648/// insert_in_vec(&mut vec, 1, 3, &1);
649/// assert_eq!(vec, vec![1, 3, 0, 9, 1, 10]);
650/// ```
651pub fn insert_in_vec<Data>(vec: &mut Vec<Data>, pos: usize, data: Data, default_data: &Data)
652where
653 Data: Clone,
654{
655 if pos < vec.len() {
656 vec[pos] = data;
657 }
658 else {
659 for _ in vec.len()..pos {
660 vec.push(default_data.clone());
661 }
662 vec.push(data);
663 }
664}
665
666/// Run a parallel pool using external crate [`rayon`].
667///
668/// # Example.
669/// ```
670/// # use lattice_qcd_rs::thread::run_pool_parallel_rayon;
671/// let iter = 0..1000;
672/// let c = 5;
673/// let result = run_pool_parallel_rayon(iter, &c, |i, c1| i * i * c1);
674/// assert_eq!(result[687], 687 * 687 * c);
675/// assert_eq!(result[10], 10 * 10 * c);
676/// ```
677/// # Panic.
678/// panic if the closure panic at any point during the evaluation
679/// ```should_panic
680/// # use lattice_qcd_rs::thread::run_pool_parallel_rayon;
681/// let iter = 0..10;
682/// let result = run_pool_parallel_rayon(iter, &(), |_, _| panic!("message"));
683/// ```
684pub fn run_pool_parallel_rayon<Key, Data, CommonData, F>(
685 iter: impl Iterator<Item = Key> + Send,
686 common_data: &CommonData,
687 closure: F,
688) -> Vec<Data>
689where
690 CommonData: Sync,
691 Key: Eq + Send,
692 Data: Send,
693 F: Sync + Fn(&Key, &CommonData) -> Data,
694{
695 iter.collect::<Vec<Key>>()
696 .into_par_iter()
697 .map(|el| closure(&el, common_data))
698 .collect()
699}
700
701#[cfg(test)]
702mod test {
703 use std::error::Error;
704
705 use super::*;
706 use crate::error::ImplementationError;
707
708 #[test]
709 fn thread_error() {
710 assert_eq!(
711 format!("{}", ThreadAnyError::ThreadNumberIncorrect),
712 "number of thread is incorrect"
713 );
714 assert!(
715 format!("{}", ThreadAnyError::Panic(vec![Box::new(())])).contains("a thread panicked")
716 );
717 assert!(
718 format!("{}", ThreadAnyError::Panic(vec![Box::new("message 1")])).contains("message 1")
719 );
720 assert!(format!("{}", ThreadAnyError::Panic(vec![])).contains("0 thread panicked"));
721
722 assert!(ThreadAnyError::ThreadNumberIncorrect.source().is_none());
723 assert!(ThreadAnyError::Panic(vec![Box::new(())]).source().is_none());
724 assert!(
725 ThreadAnyError::Panic(vec![Box::new(ImplementationError::Unreachable)])
726 .source()
727 .is_none()
728 );
729 assert!(ThreadAnyError::Panic(vec![Box::new("test")])
730 .source()
731 .is_none());
732 // -------
733 assert_eq!(
734 format!("{}", ThreadError::ThreadNumberIncorrect),
735 "number of thread is incorrect"
736 );
737 assert!(format!("{}", ThreadError::Panic(vec![None])).contains("a thread panicked"));
738 assert!(format!("{}", ThreadError::Panic(vec![None, None])).contains("2 threads panicked"));
739 assert!(format!(
740 "{}",
741 ThreadError::Panic(vec![Some("message 1".to_string())])
742 )
743 .contains("message 1"));
744 assert!(format!("{}", ThreadError::Panic(vec![])).contains("0 thread panicked"));
745
746 assert!(ThreadError::ThreadNumberIncorrect.source().is_none());
747 assert!(ThreadError::Panic(vec![None]).source().is_none());
748 assert!(ThreadError::Panic(vec![Some("".to_string())])
749 .source()
750 .is_none());
751 assert!(ThreadError::Panic(vec![Some("test".to_string())])
752 .source()
753 .is_none());
754 //---------------
755
756 let error = ThreadAnyError::Panic(vec![
757 Box::new(()),
758 Box::new("t1"),
759 Box::new("t2".to_string()),
760 ]);
761 let error2 = ThreadAnyError::Panic(vec![
762 Box::new(""),
763 Box::new("t1".to_string()),
764 Box::new("t2".to_string()),
765 ]);
766 let error3 = ThreadError::Panic(vec![None, Some("t1".to_string()), Some("t2".to_string())]);
767 assert_eq!(ThreadError::from(error), error3);
768 assert_eq!(ThreadAnyError::from(error3).to_string(), error2.to_string());
769
770 let error = ThreadAnyError::ThreadNumberIncorrect;
771 let error2 = ThreadError::ThreadNumberIncorrect;
772 assert_eq!(ThreadError::from(error), error2);
773 let error = ThreadAnyError::ThreadNumberIncorrect;
774 assert_eq!(ThreadAnyError::from(error2).to_string(), error.to_string());
775 }
776}