lattice_qcd_rs/
thread.rs

1//! Tool for easy use of multi threading.
2
3use std::{
4    any::Any,
5    collections::HashMap,
6    error::Error,
7    fmt::{self, Display, Formatter},
8    hash::Hash,
9    iter::Iterator,
10    sync::{mpsc, Arc, Mutex},
11    vec::Vec,
12};
13
14use crossbeam::thread;
15use rayon::iter::IntoParallelIterator;
16use rayon::prelude::ParallelIterator;
17#[cfg(feature = "serde-serialize")]
18use serde::{Deserialize, Serialize};
19
20use super::lattice::{LatticeCyclic, LatticeElementToIndex};
21
22/// Multithreading error.
23///
24/// This can be converted to [`ThreadError`] which is more convenient to use keeping only the case
25/// with [`String`] and [`&str`] messages.
26#[derive(Debug)]
27#[non_exhaustive]
28pub enum ThreadAnyError {
29    /// Tried to run some jobs with 0 threads
30    ThreadNumberIncorrect,
31    /// One or more of the threads panicked. Inside the [`Box`] is the panic message.
32    /// see [`run_pool_parallel`] example.
33    Panic(Vec<Box<dyn Any + Send + 'static>>),
34}
35
36impl Display for ThreadAnyError {
37    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
38        match self {
39            Self::ThreadNumberIncorrect => write!(f, "number of thread is incorrect"),
40            Self::Panic(any) => {
41                let n = any.len();
42                if n == 0 {
43                    write!(f, "0 thread panicked")?;
44                }
45                else if n == 1 {
46                    write!(f, "a thread panicked with")?;
47                }
48                else {
49                    write!(f, "{} threads panicked with [", n)?;
50                }
51
52                for (index, element_any) in any.iter().enumerate() {
53                    if let Some(string) = element_any.downcast_ref::<String>() {
54                        write!(f, "\"{}\"", string)?;
55                    }
56                    else if let Some(string) = element_any.downcast_ref::<&str>() {
57                        write!(f, "\"{}\"", string)?;
58                    }
59                    else {
60                        write!(f, "{:?}", element_any)?;
61                    }
62
63                    if index < any.len() - 1 {
64                        write!(f, " ,")?;
65                    }
66                    else if n > 1 {
67                        write!(f, "]")?;
68                    }
69                }
70
71                Ok(())
72            }
73        }
74    }
75}
76
77impl Error for ThreadAnyError {}
78
79/// Multithreading error with a string panic message.
80///
81/// It is more convenient to use compared to [`ThreadAnyError`] and can be converted from it.
82/// It convert message of type [`String`] and [`&str`] otherwise set it to None.
83#[derive(Debug, Clone, Hash, PartialEq, Eq)]
84#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
85#[non_exhaustive]
86pub enum ThreadError {
87    /// Tried to run some jobs with 0 threads
88    ThreadNumberIncorrect,
89    /// One of the thread panicked with the given messages.
90    /// see [`run_pool_parallel`] example.
91    Panic(Vec<Option<String>>),
92}
93
94impl Display for ThreadError {
95    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
96        match self {
97            Self::ThreadNumberIncorrect => write!(f, "number of thread is incorrect"),
98            Self::Panic(strings) => {
99                let n = strings.len();
100                if n == 0 {
101                    // this should not be used but it is possible to create an instance with an empty vec.
102                    write!(f, "0 thread panicked")?;
103                }
104                else if n == 1 {
105                    write!(f, "a thread panicked with")?;
106                }
107                else {
108                    write!(f, "{} threads panicked with [", n)?;
109                }
110
111                for (index, string) in strings.iter().enumerate() {
112                    if let Some(string) = string {
113                        write!(f, "\"{}\"", string)?;
114                    }
115                    else {
116                        write!(f, "None")?;
117                    }
118
119                    if index < strings.len() - 1 {
120                        write!(f, " ,")?;
121                    }
122                    else if n > 1 {
123                        write!(f, "]")?;
124                    }
125                }
126
127                Ok(())
128            }
129        }
130    }
131}
132
133impl Error for ThreadError {}
134
135impl From<ThreadAnyError> for ThreadError {
136    #[allow(clippy::manual_map)] // clarity / false positive ?
137    fn from(f: ThreadAnyError) -> Self {
138        match f {
139            ThreadAnyError::ThreadNumberIncorrect => Self::ThreadNumberIncorrect,
140            ThreadAnyError::Panic(any) => Self::Panic(
141                any.iter()
142                    .map(|element| {
143                        if let Some(string) = element.downcast_ref::<String>() {
144                            Some(string.clone())
145                        }
146                        else if let Some(string) = element.downcast_ref::<&str>() {
147                            Some(string.to_string())
148                        }
149                        else {
150                            None
151                        }
152                    })
153                    .collect(),
154            ),
155        }
156    }
157}
158
159impl From<ThreadError> for ThreadAnyError {
160    fn from(f: ThreadError) -> Self {
161        match f {
162            ThreadError::ThreadNumberIncorrect => Self::ThreadNumberIncorrect,
163            ThreadError::Panic(strings) => Self::Panic(
164                strings
165                    .iter()
166                    .map(|string| -> Box<dyn Any + Send + 'static> {
167                        if let Some(string) = string {
168                            Box::new(string.clone())
169                        }
170                        else {
171                            Box::new("".to_string())
172                        }
173                    })
174                    .collect(),
175            ),
176        }
177    }
178}
179
180/// run jobs in parallel.
181///
182/// The pool of job is given by `iter`. the job is given by `closure` that have the form `|key,common_data| -> Data`.
183/// `number_of_thread` determine the number of job done in parallel and should be greater than 0,
184/// otherwise return [`ThreadAnyError::ThreadNumberIncorrect`].
185/// `capacity` is used to determine the capacity of the [`HashMap`] upon initiation (see [`HashMap::with_capacity`])
186///
187/// # Errors
188/// Returns [`ThreadAnyError::ThreadNumberIncorrect`] is the number of threads is 0.
189/// Returns [`ThreadAnyError::Panic`] if a thread panicked. Contains the panic message.
190///
191/// # Example
192/// let us computes the value of `i^2 * c` for i in \[2,9999\] with 4 threads
193/// ```
194/// # use lattice_qcd_rs::thread::run_pool_parallel;
195/// # use std::error::Error;
196///
197/// # fn main() -> Result<(), Box<dyn Error>> {
198/// let iter = 2..10000;
199/// let c = 5;
200/// // we could have put 4 inside the closure but this demonstrate how to use common data
201/// let result = run_pool_parallel(iter, &c, &|i, c| i * i * c, 4, 10000 - 2)?;
202/// assert_eq!(*result.get(&40).unwrap(), 40 * 40 * c);
203/// assert_eq!(result.get(&1), None);
204/// # Ok(())
205/// # }
206/// ```
207/// In the next example a thread will panic, we demonstrate the return type.
208/// ```should_panic
209/// # use lattice_qcd_rs::thread::{run_pool_parallel, ThreadAnyError};
210/// let iter = 0..10;
211/// let result = run_pool_parallel(iter, &(), &|_, _| panic!("{}", "panic message"), 4, 10);
212/// match result {
213///     Ok(_) => {}
214///     Err(err) => panic!("{}", err),
215/// }
216/// ```
217/// This give the following panic message
218/// ```textrust
219/// stderr:
220/// thread '<unnamed>' panicked at 'panic message', src\thread.rs:6:51
221/// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
222/// thread '<unnamed>' panicked at 'panic message', src\thread.rs:6:51
223/// thread '<unnamed>' panicked at 'panic message', src\thread.rs:6:51
224/// thread '<unnamed>' panicked at 'panic message', src\thread.rs:6:51
225/// thread 'main' panicked at '4 threads panicked with ["panic message" ,"panic message" ,"panic message" ,"panic message"]', src\thread.rs:9:17
226/// ```
227pub fn run_pool_parallel<Key, Data, CommonData, F>(
228    iter: impl Iterator<Item = Key> + Send,
229    common_data: &CommonData,
230    closure: &F,
231    number_of_thread: usize,
232    capacity: usize,
233) -> Result<HashMap<Key, Data>, ThreadAnyError>
234where
235    CommonData: Sync,
236    Key: Eq + Hash + Send + Clone + Sync,
237    Data: Send,
238    F: Sync + Clone + Fn(&Key, &CommonData) -> Data,
239{
240    run_pool_parallel_with_initializations_mutable(
241        iter,
242        common_data,
243        &|_, key, common| closure(key, common),
244        &|| (),
245        number_of_thread,
246        capacity,
247    )
248}
249
250/// run jobs in parallel. Similar to [`run_pool_parallel`] but with initiation.
251///
252/// see [`run_pool_parallel`]. Moreover let some data to be initialize per thread.
253/// closure_init is run once per thread and store inside a mutable data which closure can modify.
254///
255/// # Errors
256/// Returns [`ThreadAnyError::ThreadNumberIncorrect`] is the number of threads is 0.
257/// Returns [`ThreadAnyError::Panic`] if a thread panicked. Contains the panic message.
258///
259/// # Examples
260/// Let us create some value but we will greet the user from the threads
261/// ```
262/// # use lattice_qcd_rs::thread::run_pool_parallel_with_initializations_mutable;
263/// # use std::error::Error;
264///
265/// # fn main() -> Result<(), Box<dyn Error>> {
266/// let iter = 0_u128..100000_u128;
267/// let c = 5_u128;
268/// // we could have put 4 inside the closure but this demonstrate how to use common data
269/// let result = run_pool_parallel_with_initializations_mutable(
270///     iter,
271///     &c,
272///     &|has_greeted: &mut bool, i, c| {
273///         if !*has_greeted {
274///             *has_greeted = true;
275///             println!("Hello from the thread");
276///         }
277///         i * i * c
278///     },
279///     || false,
280///     4,
281///     100000,
282/// )?;
283/// # Ok(())
284/// # }
285/// ```
286/// will print "Hello from the thread" four times.
287///
288/// Another useful application is to use an rng
289/// ```
290/// extern crate rand;
291/// extern crate rand_distr;
292/// use lattice_qcd_rs::field::Su3Adjoint;
293/// use lattice_qcd_rs::lattice::LatticeCyclic;
294/// use lattice_qcd_rs::thread::run_pool_parallel_with_initializations_mutable;
295/// # use std::error::Error;
296///
297/// # fn main() -> Result<(), Box<dyn Error>> {
298/// let l = LatticeCyclic::<4>::new(1_f64, 4)?;
299/// let distribution = rand::distributions::Uniform::from(-1_f64..1_f64);
300/// let result = run_pool_parallel_with_initializations_mutable(
301///     l.get_links(),
302///     &distribution,
303///     &|rng, _, d| Su3Adjoint::random(rng, d).to_su3(),
304///     rand::thread_rng,
305///     4,
306///     l.number_of_canonical_links_space(),
307/// )?;
308/// # Ok(())
309/// # }
310/// ```
311#[allow(clippy::needless_return)] // for readability
312#[allow(clippy::semicolon_if_nothing_returned)] // I actually want to returns a never in the future
313pub fn run_pool_parallel_with_initializations_mutable<Key, Data, CommonData, InitData, F, FInit>(
314    iter: impl Iterator<Item = Key> + Send,
315    common_data: &CommonData,
316    closure: &F,
317    closure_init: FInit,
318    number_of_thread: usize,
319    capacity: usize,
320) -> Result<HashMap<Key, Data>, ThreadAnyError>
321where
322    CommonData: Sync,
323    Key: Eq + Hash + Send + Clone + Sync,
324    Data: Send,
325    F: Sync + Clone + Fn(&mut InitData, &Key, &CommonData) -> Data,
326    FInit: Send + Clone + FnOnce() -> InitData,
327{
328    if number_of_thread == 0 {
329        return Err(ThreadAnyError::ThreadNumberIncorrect);
330    }
331    else if number_of_thread == 1 {
332        let mut hash_map = HashMap::<Key, Data>::with_capacity(capacity);
333        let mut init_data = closure_init();
334        for i in iter {
335            std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
336                hash_map.insert(i.clone(), closure(&mut init_data, &i, common_data))
337            }))
338            .map_err(|err| ThreadAnyError::Panic(vec![err]))?;
339        }
340        return Ok(hash_map);
341    }
342    else {
343        let result = thread::scope(|s| {
344            let mutex_iter = Arc::new(Mutex::new(iter));
345            let mut threads = Vec::with_capacity(number_of_thread);
346            let (result_tx, result_rx) = mpsc::channel::<(Key, Data)>();
347            for _ in 0..number_of_thread {
348                let iter_clone = Arc::clone(&mutex_iter);
349                let transmitter = result_tx.clone();
350                let closure_init_clone = closure_init.clone();
351                let handel = s.spawn(move |_| {
352                    let mut init_data = closure_init_clone();
353                    loop {
354                        let val = iter_clone.lock().unwrap().next();
355                        match val {
356                            Some(i) => transmitter
357                                .send((i.clone(), closure(&mut init_data, &i, common_data)))
358                                .unwrap(),
359                            None => break,
360                        }
361                    }
362                });
363                threads.push(handel);
364            }
365            // we drop channel so we can properly assert if they are closed
366            drop(result_tx);
367            let mut hash_map = HashMap::<Key, Data>::with_capacity(capacity);
368            for message in result_rx {
369                let (key, data) = message;
370                hash_map.insert(key, data);
371            }
372
373            let panics = threads
374                .into_iter()
375                .map(|handel| handel.join())
376                .filter_map(|res| res.err())
377                .collect::<Vec<_>>();
378            if !panics.is_empty() {
379                return Err(ThreadAnyError::Panic(panics));
380            }
381
382            Ok(hash_map)
383        })
384        .unwrap_or_else(|err| {
385            if err
386                .downcast_ref::<Vec<Box<dyn Any + 'static + Send>>>()
387                .is_some()
388            {
389                unreachable!("a failing handle is not joined")
390            }
391            unreachable!("main thread panicked")
392        });
393        return result;
394    }
395}
396
397/// run jobs in parallel. Similar to [`run_pool_parallel`] but return a vector.
398///
399/// Now a reference to the lattice must be given and `key` must implement the trait
400/// [`super::lattice::LatticeElementToIndex`].
401/// [`super::lattice::LatticeElementToIndex::to_index`] will be use to insert the data inside the vector.
402/// While computing because the thread can operate out of order, fill the data not yet computed by `default_data`
403/// `capacity` is used to determine the capacity of the [`std::vec::Vec`] upon initiation
404/// (see [`std::vec::Vec::with_capacity`]).
405///
406/// # Errors
407/// Returns [`ThreadAnyError::ThreadNumberIncorrect`] is the number of threads is 0.
408/// Returns [`ThreadAnyError::Panic`] if a thread panicked. Contains the panic message.
409///
410/// # Example
411/// ```
412/// use lattice_qcd_rs::field::Su3Adjoint;
413/// use lattice_qcd_rs::lattice::{LatticeCyclic, LatticeElementToIndex, LatticePoint};
414/// use lattice_qcd_rs::thread::run_pool_parallel_vec;
415/// # use std::error::Error;
416///
417/// # fn main() -> Result<(), Box<dyn Error>> {
418/// let l = LatticeCyclic::<4>::new(1_f64, 4)?;
419/// let c = 5_usize;
420/// let result = run_pool_parallel_vec(
421///     l.get_points(),
422///     &c,
423///     &|i: &LatticePoint<4>, c: &usize| i[0] * c,
424///     4,
425///     l.number_of_canonical_links_space(),
426///     &l,
427///     &0,
428/// )?;
429/// let point = LatticePoint::new([3, 0, 5, 0].into());
430/// assert_eq!(result[point.to_index(&l)], point[0] * c);
431/// # Ok(())
432/// # }
433/// ```
434pub fn run_pool_parallel_vec<Key, Data, CommonData, F, const D: usize>(
435    iter: impl Iterator<Item = Key> + Send,
436    common_data: &CommonData,
437    closure: &F,
438    number_of_thread: usize,
439    capacity: usize,
440    l: &LatticeCyclic<D>,
441    default_data: &Data,
442) -> Result<Vec<Data>, ThreadAnyError>
443where
444    CommonData: Sync,
445    Key: Eq + Send + Clone + Sync + LatticeElementToIndex<D>,
446    Data: Send + Clone,
447    F: Sync + Clone + Fn(&Key, &CommonData) -> Data,
448{
449    run_pool_parallel_vec_with_initializations_mutable(
450        iter,
451        common_data,
452        &|_, key, common| closure(key, common),
453        &|| (),
454        number_of_thread,
455        capacity,
456        l,
457        default_data,
458    )
459}
460
461// TODO convert closure for conversion key -> usize
462
463/// run jobs in parallel. Similar to [`run_pool_parallel_vec`] but with initiation.
464///
465/// # Errors
466/// Returns [`ThreadAnyError::ThreadNumberIncorrect`] is the number of threads is 0.
467/// Returns [`ThreadAnyError::Panic`] if a thread panicked. Contains the panic message.
468///
469/// # Examples
470/// Let us create some value but we will greet the user from the threads
471/// ```
472/// use lattice_qcd_rs::lattice::{LatticeCyclic, LatticeElementToIndex, LatticePoint};
473/// use lattice_qcd_rs::thread::run_pool_parallel_vec_with_initializations_mutable;
474/// # use std::error::Error;
475///
476/// # fn main() -> Result<(), Box<dyn Error>> {
477/// let l = LatticeCyclic::<4>::new(1_f64, 25)?;
478/// let iter = l.get_points();
479/// let c = 5_usize;
480/// // we could have put 4 inside the closure but this demonstrate how to use common data
481/// let result = run_pool_parallel_vec_with_initializations_mutable(
482///     iter,
483///     &c,
484///     &|has_greeted: &mut bool, i: &LatticePoint<4>, c: &usize| {
485///         if !*has_greeted {
486///             *has_greeted = true;
487///             println!("Hello from the thread");
488///         }
489///         i[0] * c
490///     },
491///     || false,
492///     4,
493///     100000,
494///     &l,
495///     &0,
496/// )?;
497/// # Ok(())
498/// # }
499/// ```
500/// will print "Hello from the thread" four times.
501///
502/// Another useful application is to use an rng
503/// ```
504/// extern crate rand;
505/// extern crate rand_distr;
506/// extern crate nalgebra;
507/// use lattice_qcd_rs::field::Su3Adjoint;
508/// use lattice_qcd_rs::lattice::LatticeCyclic;
509/// use lattice_qcd_rs::thread::run_pool_parallel_vec_with_initializations_mutable;
510/// # use std::error::Error;
511///
512/// # fn main() -> Result<(), Box<dyn Error>> {
513/// let l = LatticeCyclic::<4>::new(1_f64, 4)?;
514/// let distribution = rand::distributions::Uniform::from(-1_f64..1_f64);
515/// let result = run_pool_parallel_vec_with_initializations_mutable(
516///     l.get_links(),
517///     &distribution,
518///     &|rng, _, d| Su3Adjoint::random(rng, d).to_su3(),
519///     rand::thread_rng,
520///     4,
521///     l.number_of_canonical_links_space(),
522///     &l,
523///     &nalgebra::Matrix3::<nalgebra::Complex<f64>>::zeros(),
524/// )?;
525/// # Ok(())
526/// # }
527/// ```
528#[allow(clippy::too_many_arguments)]
529#[allow(clippy::needless_return)] // for readability
530#[allow(clippy::semicolon_if_nothing_returned)] // I actually want to return a never in the future
531pub fn run_pool_parallel_vec_with_initializations_mutable<
532    Key,
533    Data,
534    CommonData,
535    InitData,
536    F,
537    FInit,
538    const D: usize,
539>(
540    iter: impl Iterator<Item = Key> + Send,
541    common_data: &CommonData,
542    closure: &F,
543    closure_init: FInit,
544    number_of_thread: usize,
545    capacity: usize,
546    l: &LatticeCyclic<D>,
547    default_data: &Data,
548) -> Result<Vec<Data>, ThreadAnyError>
549where
550    CommonData: Sync,
551    Key: Eq + Send + Clone + Sync,
552    Data: Send + Clone,
553    F: Sync + Clone + Fn(&mut InitData, &Key, &CommonData) -> Data,
554    FInit: Send + Clone + FnOnce() -> InitData,
555    Key: LatticeElementToIndex<D>,
556{
557    if number_of_thread == 0 {
558        return Err(ThreadAnyError::ThreadNumberIncorrect);
559    }
560    else if number_of_thread == 1 {
561        let mut vec = Vec::<Data>::with_capacity(capacity);
562        let mut init_data = closure_init();
563        for i in iter {
564            std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
565                insert_in_vec(
566                    &mut vec,
567                    i.clone().to_index(l),
568                    closure(&mut init_data, &i, common_data),
569                    default_data,
570                );
571            }))
572            .map_err(|err| ThreadAnyError::Panic(vec![err]))?;
573        }
574        return Ok(vec);
575    }
576    else {
577        let result = thread::scope(|s| {
578            // I try to put the thread creation in a function but the life time annotation were a mess.
579            // I did not manage to make it working.
580            let mutex_iter = Arc::new(Mutex::new(iter));
581            let mut threads = Vec::with_capacity(number_of_thread);
582            let (result_tx, result_rx) = mpsc::channel::<(Key, Data)>();
583            for _ in 0..number_of_thread {
584                let iter_clone = Arc::clone(&mutex_iter);
585                let transmitter = result_tx.clone();
586                let closure_init_clone = closure_init.clone();
587                let handel = s.spawn(move |_| {
588                    let mut init_data = closure_init_clone();
589                    loop {
590                        let val = iter_clone.lock().unwrap().next();
591                        match val {
592                            Some(i) => transmitter
593                                .send((i.clone(), closure(&mut init_data, &i, common_data)))
594                                .unwrap(),
595                            None => break,
596                        }
597                    }
598                });
599                threads.push(handel);
600            }
601            // we drop channel so we can properly assert if they are closed
602            drop(result_tx);
603            let mut vec = Vec::<Data>::with_capacity(capacity);
604            for message in result_rx {
605                let (key, data) = message;
606                insert_in_vec(&mut vec, key.to_index(l), data, default_data);
607            }
608
609            let panics = threads
610                .into_iter()
611                .map(|handel| handel.join())
612                .filter_map(|res| res.err())
613                .collect::<Vec<_>>();
614            if !panics.is_empty() {
615                return Err(ThreadAnyError::Panic(panics));
616            }
617
618            Ok(vec)
619        })
620        .unwrap_or_else(|err| {
621            if err
622                .downcast_ref::<Vec<Box<dyn Any + 'static + Send>>>()
623                .is_some()
624            {
625                unreachable!("a failing handle is not joined")
626            }
627            unreachable!("main thread panicked")
628        });
629        return result;
630    }
631}
632
633/// Try setting the value inside the vec at position `pos`. If the position is not the array,
634/// build the array with default value up to `pos - 1` and insert data at `pos`.
635///
636/// # Example
637/// ```
638/// # use lattice_qcd_rs::thread::insert_in_vec;
639/// use std::vec::Vec;
640///
641/// let mut vec = vec![];
642/// insert_in_vec(&mut vec, 0, 1, &0);
643/// assert_eq!(vec, vec![1]);
644/// insert_in_vec(&mut vec, 3, 9, &0);
645/// assert_eq!(vec, vec![1, 0, 0, 9]);
646/// insert_in_vec(&mut vec, 5, 10, &1);
647/// assert_eq!(vec, vec![1, 0, 0, 9, 1, 10]);
648/// insert_in_vec(&mut vec, 1, 3, &1);
649/// assert_eq!(vec, vec![1, 3, 0, 9, 1, 10]);
650/// ```
651pub fn insert_in_vec<Data>(vec: &mut Vec<Data>, pos: usize, data: Data, default_data: &Data)
652where
653    Data: Clone,
654{
655    if pos < vec.len() {
656        vec[pos] = data;
657    }
658    else {
659        for _ in vec.len()..pos {
660            vec.push(default_data.clone());
661        }
662        vec.push(data);
663    }
664}
665
666/// Run a parallel pool using external crate [`rayon`].
667///
668/// # Example.
669/// ```
670/// # use lattice_qcd_rs::thread::run_pool_parallel_rayon;
671/// let iter = 0..1000;
672/// let c = 5;
673/// let result = run_pool_parallel_rayon(iter, &c, |i, c1| i * i * c1);
674/// assert_eq!(result[687], 687 * 687 * c);
675/// assert_eq!(result[10], 10 * 10 * c);
676/// ```
677/// # Panic.
678/// panic if the closure panic at any point during the evaluation
679/// ```should_panic
680/// # use lattice_qcd_rs::thread::run_pool_parallel_rayon;
681/// let iter = 0..10;
682/// let result = run_pool_parallel_rayon(iter, &(), |_, _| panic!("message"));
683/// ```
684pub fn run_pool_parallel_rayon<Key, Data, CommonData, F>(
685    iter: impl Iterator<Item = Key> + Send,
686    common_data: &CommonData,
687    closure: F,
688) -> Vec<Data>
689where
690    CommonData: Sync,
691    Key: Eq + Send,
692    Data: Send,
693    F: Sync + Fn(&Key, &CommonData) -> Data,
694{
695    iter.collect::<Vec<Key>>()
696        .into_par_iter()
697        .map(|el| closure(&el, common_data))
698        .collect()
699}
700
701#[cfg(test)]
702mod test {
703    use std::error::Error;
704
705    use super::*;
706    use crate::error::ImplementationError;
707
708    #[test]
709    fn thread_error() {
710        assert_eq!(
711            format!("{}", ThreadAnyError::ThreadNumberIncorrect),
712            "number of thread is incorrect"
713        );
714        assert!(
715            format!("{}", ThreadAnyError::Panic(vec![Box::new(())])).contains("a thread panicked")
716        );
717        assert!(
718            format!("{}", ThreadAnyError::Panic(vec![Box::new("message 1")])).contains("message 1")
719        );
720        assert!(format!("{}", ThreadAnyError::Panic(vec![])).contains("0 thread panicked"));
721
722        assert!(ThreadAnyError::ThreadNumberIncorrect.source().is_none());
723        assert!(ThreadAnyError::Panic(vec![Box::new(())]).source().is_none());
724        assert!(
725            ThreadAnyError::Panic(vec![Box::new(ImplementationError::Unreachable)])
726                .source()
727                .is_none()
728        );
729        assert!(ThreadAnyError::Panic(vec![Box::new("test")])
730            .source()
731            .is_none());
732        // -------
733        assert_eq!(
734            format!("{}", ThreadError::ThreadNumberIncorrect),
735            "number of thread is incorrect"
736        );
737        assert!(format!("{}", ThreadError::Panic(vec![None])).contains("a thread panicked"));
738        assert!(format!("{}", ThreadError::Panic(vec![None, None])).contains("2 threads panicked"));
739        assert!(format!(
740            "{}",
741            ThreadError::Panic(vec![Some("message 1".to_string())])
742        )
743        .contains("message 1"));
744        assert!(format!("{}", ThreadError::Panic(vec![])).contains("0 thread panicked"));
745
746        assert!(ThreadError::ThreadNumberIncorrect.source().is_none());
747        assert!(ThreadError::Panic(vec![None]).source().is_none());
748        assert!(ThreadError::Panic(vec![Some("".to_string())])
749            .source()
750            .is_none());
751        assert!(ThreadError::Panic(vec![Some("test".to_string())])
752            .source()
753            .is_none());
754        //---------------
755
756        let error = ThreadAnyError::Panic(vec![
757            Box::new(()),
758            Box::new("t1"),
759            Box::new("t2".to_string()),
760        ]);
761        let error2 = ThreadAnyError::Panic(vec![
762            Box::new(""),
763            Box::new("t1".to_string()),
764            Box::new("t2".to_string()),
765        ]);
766        let error3 = ThreadError::Panic(vec![None, Some("t1".to_string()), Some("t2".to_string())]);
767        assert_eq!(ThreadError::from(error), error3);
768        assert_eq!(ThreadAnyError::from(error3).to_string(), error2.to_string());
769
770        let error = ThreadAnyError::ThreadNumberIncorrect;
771        let error2 = ThreadError::ThreadNumberIncorrect;
772        assert_eq!(ThreadError::from(error), error2);
773        let error = ThreadAnyError::ThreadNumberIncorrect;
774        assert_eq!(ThreadAnyError::from(error2).to_string(), error.to_string());
775    }
776}