1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
use crate::parallel::{num_threads, Reducer}; use crossbeam_utils::thread; pub fn join<O1: Send, O2: Send>(left: impl FnOnce() -> O1 + Send, right: impl FnOnce() -> O2 + Send) -> (O1, O2) { thread::scope(|s| { let left = s.spawn(|_| left()); let right = s.spawn(|_| right()); (left.join().unwrap(), right.join().unwrap()) }) .unwrap() } pub fn in_parallel<I, S, O, R>( input: impl Iterator<Item = I> + Send, thread_limit: Option<usize>, new_thread_state: impl Fn(usize) -> S + Send + Sync, consume: impl Fn(I, &mut S) -> O + Send + Sync, mut reducer: R, ) -> Result<<R as Reducer>::Output, <R as Reducer>::Error> where R: Reducer<Input = O>, I: Send, O: Send, { let num_threads = num_threads(thread_limit); let new_thread_state = &new_thread_state; let consume = &consume; thread::scope(move |s| { let receive_result = { let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads); let (send_result, receive_result) = std::sync::mpsc::sync_channel::<O>(num_threads); for thread_id in 0..num_threads { s.spawn({ let send_result = send_result.clone(); let receive_input = receive_input.clone(); move |_| { let mut state = new_thread_state(thread_id); for item in receive_input { if send_result.send(consume(item, &mut state)).is_err() { break; } } } }); } s.spawn(move |_| { for item in input { if send_input.send(item).is_err() { break; } } }); receive_result }; for item in receive_result { reducer.feed(item)?; } reducer.finalize() }) .unwrap() }