1
2pub mod threadlake;
4
5pub mod threadutilities;
6pub mod iterators;
7
8pub mod builder;
10
11pub mod traits;
12
13pub mod disjointer;
15
16pub mod split;
18
19
20#[cfg(test)]
21mod tests {
22 use crate::threadlake::ThreadLake;
23 use std::time::Duration;
24 use crate::threadutilities::ThreadUtilities;
25 use crate::builder::Builder;
26 use crate::disjointer::Disjointer;
27 use crate::traits::FullParallelism;
28 use std::sync::Mutex;
29
30 #[test]
31 fn hello_lakes() {
32 let lake = Builder::new(4)
33 .spawn(|x: ThreadUtilities<_>| {
34 println!("Hello from thread {}", x.index());
35 });
36
37 lake.join();
38 }
39
40 #[test]
41 fn sum() {
42
43 let n = 1000005;
44
45 let lake: ThreadLake<_, usize> = Builder::new(10)
47 .spawn(move |x| {
48
49 x.range(n).sum()
50 });
51
52 let total: usize = lake.join_iter().map(|x| x.unwrap()).sum();
54
55 assert_eq!(total, 500004500010);
56
57 }
58
59 #[test]
60 fn index_search() {
61
62 let mut test_vector: Vec<_> = (0..1000000).map(|_| 0).collect();
63
64 test_vector[759246] = 100;
65
66 let lake = Builder::with_data(10, test_vector)
67 .spawn(|x: ThreadUtilities<_>| {
68
69 let v = x.data();
70
71 let subslice = x.split_slice(v.as_slice());
72
73 subslice.iter().enumerate().find_map(|(ind, val)| if *val != 0 { Some(ind + subslice.width() * x.index() ) } else {None})
74
75 });
76
77 println!("{:?}", lake.join_iter().find(|x| if let Some(_) = x.as_ref().unwrap() { true } else {false}));
78
79 }
80
81 #[test]
82 fn simple_stop() {
83
84 let lake = Builder::new(5)
85 .spawn(|x|{
86
87 if x.index() == 0 {
88 x.send(()).unwrap();
90 }
91
92 while !x.check() {
94 std::thread::sleep(Duration::from_millis(100));
95 }
96 });
97
98 lake.receiver().recv().unwrap();
100
101 lake.stop();
103
104 lake.join();
105
106
107 }
108
109 #[test]
110 fn simple_messages() {
111
112 let lake = Builder::new(FullParallelism)
113 .spawn(|x| {
114 x.send(x.index()).unwrap();
115
116 std::thread::sleep(Duration::from_millis(100));
117 });
118
119 for _ in 0..lake.max_threads() {
120
121 lake.receiver().recv().unwrap();
122
123 }
124
125 lake.join();
126
127 }
128
129 fn multithread_search<T, P>(data: Vec<T>, predicate: P) -> bool
130 where
131 T: Sync + 'static + Send,
132 P: Fn(&T) -> bool + Sync + Send + 'static,
133 {
134
135
136 let lake= Builder::with_data(FullParallelism, (data, predicate))
137 .spawn(|x: ThreadUtilities<_, _> | {
138 {
139 let (data, pred) = x.data();
140 let subslice = x.split_slice(data.as_slice());
141
142 for element in subslice {
143 if (pred)(element) {
144 x.send(Some(())).ok();
146 return
147 }
148 }
149 }
150 x.send(None).ok();
151 });
152
153 for _ in 0..lake.max_threads() {
155 match lake.receiver().recv().unwrap() {
156 Some(_) => {
157 return true
159 },
160 None => {
161
162 }
163 }
164 }
165
166 false
167 }
169
170 #[test]
171 fn search_test() {
172
173 let list: Vec<_> = (0..1000000).enumerate().map(|(i, _)| i ).collect();
174
175 assert_eq!(multithread_search(list.clone(), |x| *x == 10000), true);
176 assert_eq!(multithread_search(list.clone(), |x| *x == 1000001), false);
177
178 }
179
180 #[test]
181 fn panic_test() {
182 let lake = Builder::new(2)
183 .names(|x: usize| format!("Panicable thread number {}", x))
184 .spawn(|x: ThreadUtilities<_>| {
185 println!("thread name: {}", x.name());
186 panic!("This panic is deliberate, used to test that the user-specified thread names show in panic messages and ThreadUtilities");
187 });
188
189 assert!(lake.join_iter().all(|x| if let Err(_) = x { true } else { false }));
191 }
192
193 #[test]
194 fn name_test() {
195 let lake = Builder::new(3)
196 .names(|x: usize| format!("My Thread {}", x))
197 .spawn(|_: ThreadUtilities<_>| {});
198
199 for (i, (_, str)) in lake.thread_iter().enumerate() {
200 assert_eq!(str, format!("My Thread {}", i).as_str());
201 }
202
203 lake.join();
204 }
205
206 #[test]
238 fn disjoint_test() {
239 let v = vec![0; 100000];
240
241 let v = Disjointer::new(v);
242
243 let lake = Builder::with_data(FullParallelism, v)
244 .spawn(|x: ThreadUtilities<_> |{
245 let mut subslice = x.data().piece(&x);
246
247 let offset = subslice.width() * x.index();
248
249 for (i, element) in subslice.iter_mut().enumerate() {
250 *element = *element + (i + offset); }
252
253
254 });
255
256 let d = lake.join().unwrap().take();
257
258 let lake = Builder::with_data(FullParallelism, d)
260 .spawn(|x: ThreadUtilities<_>| {
261 let subslice = x.split_slice(x.data());
262
263 let offset = subslice.width() * x.index();
264
265 subslice.iter().enumerate().all(|(i, x)| *x == i + offset)
266 });
267
268 assert!(lake.join_iter().all(|x| x.unwrap() == true))
269
270
271 }
272
273 #[test]
274 fn mutex_test() {
275 let test_vector: Vec<_> = (0..100000).map(|x| x).collect();
276
277 let results = Mutex::new(Vec::<i32>::new());
278
279 let lake = Builder::with_data(FullParallelism, (test_vector, results))
280 .spawn(|x: ThreadUtilities<_>| {
281
282 let subslice = x.split_slice(&x.data().0);
283
284 for element in subslice {
285
286 if (*element as f64).log2() == (*element as f64).log2().floor() {
288 let mut res = x.data().1.lock().unwrap();
289
290 res.push(*element);
291 }
292 }
293
294 });
295
296 let (_, results) = lake.join().unwrap();
297
298 let mut results = results.into_inner().unwrap();
299
300 results.sort();
301
302 assert_eq!(results, vec![0, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536]);
303
304 }
305
306}