thread_lake/
lib.rs

1
2///Main object used to manage thread pool
3pub mod threadlake;
4
5pub mod threadutilities;
6pub mod iterators;
7
8///Builder object to create thread pools
9pub mod builder;
10
11pub mod traits;
12
13///Object used to mutably access a vector by multiple threads simultaneously
14pub mod disjointer;
15
16///Objects encapsulating a subslice created from the [`threadutilities::ThreadUtilities`] functions
17pub mod split;
18
19
20#[cfg(test)]
21mod tests {
22    use crate::threadlake::ThreadLake;
23    use std::time::Duration;
24    use crate::threadutilities::ThreadUtilities;
25    use crate::builder::Builder;
26    use crate::disjointer::Disjointer;
27    use crate::traits::FullParallelism;
28    use std::sync::Mutex;
29
30    #[test]
31    fn hello_lakes() {
32        let lake = Builder::new(4)
33            .spawn(|x: ThreadUtilities<_>| {
34                println!("Hello from thread {}", x.index());
35            });
36
37        lake.join();
38    }
39
40    #[test]
41    fn sum() {
42
43        let n = 1000005;
44
45        //Create lake that will spawn 10 threads
46        let lake: ThreadLake<_, usize> = Builder::new(10)
47            .spawn(move |x| {
48
49                x.range(n).sum()
50            });
51
52        //Total up the sum from each thread
53        let total: usize = lake.join_iter().map(|x| x.unwrap()).sum();
54
55        assert_eq!(total, 500004500010);
56
57    }
58
59    #[test]
60    fn index_search() {
61
62        let mut test_vector: Vec<_> = (0..1000000).map(|_| 0).collect();
63
64        test_vector[759246] = 100;
65
66        let lake  = Builder::with_data(10, test_vector)
67            .spawn(|x: ThreadUtilities<_>| {
68
69                let v = x.data();
70
71                let subslice = x.split_slice(v.as_slice());
72
73                subslice.iter().enumerate().find_map(|(ind, val)| if *val != 0 { Some(ind + subslice.width() * x.index() ) } else {None})
74
75            });
76
77        println!("{:?}", lake.join_iter().find(|x| if let Some(_) = x.as_ref().unwrap() { true } else {false}));
78
79    }
80
81    #[test]
82    fn simple_stop() {
83
84        let lake = Builder::new(5)
85            .spawn(|x|{
86
87                if x.index() == 0 {
88                    //0th thread sends a message to main thread
89                    x.send(()).unwrap();
90                }
91
92                //If check is true, we must stop the thread
93                while !x.check() {
94                    std::thread::sleep(Duration::from_millis(100));
95                }
96            });
97
98        //Main thread waits for the message
99        lake.receiver().recv().unwrap();
100
101        //When the main thread gets the message, we send the stop signal
102        lake.stop();
103
104        lake.join();
105
106
107    }
108
109    #[test]
110    fn simple_messages() {
111
112        let lake = Builder::new(FullParallelism)
113            .spawn(|x| {
114                x.send(x.index()).unwrap();
115
116                std::thread::sleep(Duration::from_millis(100));
117            });
118
119        for _ in 0..lake.max_threads() {
120
121            lake.receiver().recv().unwrap();
122
123        }
124
125        lake.join();
126
127    }
128
129    fn multithread_search<T, P>(data: Vec<T>, predicate: P) -> bool
130        where
131            T: Sync + 'static + Send,
132            P: Fn(&T) -> bool + Sync + Send + 'static,
133    {
134
135
136        let lake= Builder::with_data(FullParallelism, (data, predicate))
137            .spawn(|x: ThreadUtilities<_, _> | {
138                {
139                    let (data, pred) = x.data();
140                    let subslice = x.split_slice(data.as_slice());
141
142                    for  element in subslice {
143                        if (pred)(element) {
144                            //When we find the element, send a 'found' message to the main thread, then terminate this thread
145                            x.send(Some(())).ok();
146                            return
147                        }
148                    }
149                }
150                x.send(None).ok();
151            });
152
153        //We expect one response from each thread
154        for _ in 0..lake.max_threads() {
155            match lake.receiver().recv().unwrap() {
156                Some(_) => {
157                    //If a thread has found the result, return true. This could leave other worker threads still searching, but the main thread will continue
158                    return true
159                },
160                None => {
161
162                }
163            }
164        }
165
166        false
167        //lake.join_iter().any(|x| x.unwrap())
168    }
169
170    #[test]
171    fn search_test() {
172
173        let list: Vec<_> = (0..1000000).enumerate().map(|(i, _)| i ).collect();
174
175        assert_eq!(multithread_search(list.clone(), |x| *x == 10000), true);
176        assert_eq!(multithread_search(list.clone(), |x| *x == 1000001), false);
177
178    }
179
180    #[test]
181    fn panic_test() {
182        let lake = Builder::new(2)
183            .names(|x: usize| format!("Panicable thread number {}", x))
184            .spawn(|x: ThreadUtilities<_>| {
185                println!("thread name: {}", x.name());
186                panic!("This panic is deliberate, used to test that the user-specified thread names show in panic messages and ThreadUtilities");
187            });
188
189        //Assert that all threads and with an error
190        assert!(lake.join_iter().all(|x| if let Err(_) = x { true } else { false }));
191    }
192
193    #[test]
194    fn name_test() {
195        let lake = Builder::new(3)
196            .names(|x: usize| format!("My Thread {}", x))
197            .spawn(|_: ThreadUtilities<_>| {});
198
199        for (i, (_, str)) in lake.thread_iter().enumerate() {
200            assert_eq!(str, format!("My Thread {}", i).as_str());
201        }
202
203        lake.join();
204    }
205
206    /*fn multithreaded_reduce<T, B, F>(list: Vec<T>, init: T, f: F) -> T
207        where
208            T: Sync + 'static + Send + Clone,
209            F: Fn(T, &T) -> T + Sync + Send + 'static + Clone,
210    {
211
212        let lake = Builder::with_data(|x: Option<usize>| x.unwrap(), (list, f, init.clone()))
213            .spawn(|x: ThreadUtilities<_>| {
214
215                let (list, f, _) = x.data();
216
217                let (slice, _) = x.split_slice(list.as_slice());
218
219                let first = slice[0].clone();
220
221                slice.iter().skip(1).fold(first, f)
222
223            });
224
225        let f = lake.data().1.clone();
226
227        let mut init = init.clone();
228
229        for element in lake.join_iter() {
230            init = (f)(init, &element.unwrap());
231        }
232
233        init
234
235    }*/
236
237    #[test]
238    fn disjoint_test() {
239        let v = vec![0; 100000];
240
241        let v = Disjointer::new(v);
242
243        let lake = Builder::with_data(FullParallelism, v)
244            .spawn(|x: ThreadUtilities<_> |{
245                let mut subslice = x.data().piece(&x);
246
247                let offset = subslice.width() * x.index();
248
249                for (i, element) in subslice.iter_mut().enumerate() {
250                    *element = *element + (i + offset); //i + offset gives the index of the entire array, i gives the index of the subslice
251                }
252
253
254            });
255
256        let d = lake.join().unwrap().take();
257
258        //Here we use another lake to verify the results, but the two algorithms could be combined into one
259        let lake = Builder::with_data(FullParallelism, d)
260            .spawn(|x: ThreadUtilities<_>| {
261                let subslice = x.split_slice(x.data());
262
263                let offset = subslice.width() * x.index();
264
265                subslice.iter().enumerate().all(|(i, x)| *x == i + offset)
266            });
267
268        assert!(lake.join_iter().all(|x| x.unwrap() == true))
269
270
271    }
272
273    #[test]
274    fn mutex_test() {
275        let test_vector: Vec<_> = (0..100000).map(|x| x).collect();
276
277        let results = Mutex::new(Vec::<i32>::new());
278
279        let lake = Builder::with_data(FullParallelism, (test_vector, results))
280            .spawn(|x: ThreadUtilities<_>| {
281
282                let subslice = x.split_slice(&x.data().0);
283
284                for element in subslice {
285
286                    //If we find a power of 2, add it to the results array
287                    if (*element as f64).log2() == (*element as f64).log2().floor() {
288                        let mut res = x.data().1.lock().unwrap();
289
290                        res.push(*element);
291                    }
292                }
293
294            });
295
296        let (_, results) = lake.join().unwrap();
297
298        let mut results = results.into_inner().unwrap();
299
300        results.sort();
301
302        assert_eq!(results, vec![0, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536]);
303
304    }
305
306}