1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use crossbeam_channel as channel;
use std::iter::FromIterator;
use std::thread;
use thread_util::JoinOnDrop;
pub trait Reducer<I> {
type Output;
fn reduce(&mut self, input: I);
fn output(self) -> Self::Output;
}
pub struct CollectReducer<I, O> {
sender: channel::Sender<I>,
thread_handle: JoinOnDrop<O>,
}
impl<I, O> CollectReducer<I, O> {
pub(crate) fn new() -> CollectReducer<I, O>
where
O: FromIterator<I> + Send + 'static,
I: Send + 'static,
{
let (tx, rx) = channel::bounded(0);
let thread_handle = JoinOnDrop::wrap(thread::spawn(move || {
let rx2 = rx.clone();
let output = O::from_iter(rx2);
rx.for_each(drop);
output
}));
CollectReducer {
sender: tx,
thread_handle,
}
}
}
impl<I, O> Reducer<I> for CollectReducer<I, O> {
type Output = O;
fn reduce(&mut self, input: I) {
self.sender.send(input);
}
fn output(self) -> Self::Output {
drop(self.sender);
self.thread_handle.join().expect("thread panicked")
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
#[test]
fn collect_vec() {
let mut reducer = CollectReducer::<_, Vec<_>>::new();
reducer.reduce(100);
reducer.reduce(200);
assert_eq!(reducer.output(), vec![100, 200]);
}
#[test]
fn collect_results_err() {
let mut reducer = CollectReducer::<_, Result<Vec<_>, _>>::new();
reducer.reduce(Ok(1));
reducer.reduce(Ok(3));
reducer.reduce(Err(4));
reducer.reduce(Ok(4));
assert_eq!(reducer.output(), Err(4));
}
#[test]
fn collect_results_ok() {
let mut reducer = CollectReducer::<_, Result<Vec<_>, ()>>::new();
reducer.reduce(Ok(1));
reducer.reduce(Ok(3));
reducer.reduce(Ok(4));
assert_eq!(reducer.output(), Ok(vec![1, 3, 4]));
}
#[test]
fn collect_hashmap() {
let mut reducer = CollectReducer::<_, HashMap<i32, i32>>::new();
reducer.reduce((1, 2));
reducer.reduce((2, 4));
reducer.reduce((1, 8));
assert_eq!(reducer.output(), [(1, 8), (2, 4)].iter().cloned().collect());
}
#[test]
fn dropped_without_output() {
let mut reducer = CollectReducer::<_, Vec<_>>::new();
reducer.reduce(100);
reducer.reduce(200);
}
}