gix_features/parallel/
eager_iter.rs1pub struct EagerIter<I: Iterator> {
6 receiver: std::sync::mpsc::Receiver<Vec<I::Item>>,
7 chunk: Option<std::vec::IntoIter<I::Item>>,
8 size_hint: (usize, Option<usize>),
9}
10
11impl<I> EagerIter<I>
12where
13 I: Iterator + Send + 'static,
14 <I as Iterator>::Item: Send,
15{
16 pub fn new(iter: I, chunk_size: usize, chunks_in_flight: usize) -> Self {
26 let (sender, receiver) = std::sync::mpsc::sync_channel(chunks_in_flight);
27 let size_hint = iter.size_hint();
28 assert!(chunk_size > 0, "non-zero chunk size is needed");
29
30 std::thread::spawn(move || {
31 let mut out = Vec::with_capacity(chunk_size);
32 for item in iter {
33 out.push(item);
34 if out.len() == chunk_size {
35 if sender.send(out).is_err() {
36 return;
37 }
38 out = Vec::with_capacity(chunk_size);
39 }
40 }
41 if !out.is_empty() {
42 sender.send(out).ok();
43 }
44 });
45 EagerIter {
46 receiver,
47 chunk: None,
48 size_hint,
49 }
50 }
51
52 fn fill_buf_and_pop(&mut self) -> Option<I::Item> {
53 self.chunk = self.receiver.recv().ok().map(|v| {
54 assert!(!v.is_empty());
55 v.into_iter()
56 });
57 self.chunk.as_mut().and_then(Iterator::next)
58 }
59}
60
61impl<I> Iterator for EagerIter<I>
62where
63 I: Iterator + Send + 'static,
64 <I as Iterator>::Item: Send,
65{
66 type Item = I::Item;
67
68 fn next(&mut self) -> Option<Self::Item> {
69 match self.chunk.as_mut() {
70 Some(chunk) => chunk.next().or_else(|| self.fill_buf_and_pop()),
71 None => self.fill_buf_and_pop(),
72 }
73 }
74
75 fn size_hint(&self) -> (usize, Option<usize>) {
76 self.size_hint
77 }
78}
79
80pub enum EagerIterIf<I: Iterator> {
82 Eager(EagerIter<I>),
84 OnDemand(I),
86}
87
88impl<I> EagerIterIf<I>
89where
90 I: Iterator + Send + 'static,
91 <I as Iterator>::Item: Send,
92{
93 pub fn new(condition: impl FnOnce() -> bool, iter: I, chunk_size: usize, chunks_in_flight: usize) -> Self {
97 if condition() {
98 EagerIterIf::Eager(EagerIter::new(iter, chunk_size, chunks_in_flight))
99 } else {
100 EagerIterIf::OnDemand(iter)
101 }
102 }
103}
104impl<I> Iterator for EagerIterIf<I>
105where
106 I: Iterator + Send + 'static,
107 <I as Iterator>::Item: Send,
108{
109 type Item = I::Item;
110
111 fn next(&mut self) -> Option<Self::Item> {
112 match self {
113 EagerIterIf::OnDemand(i) => i.next(),
114 EagerIterIf::Eager(i) => i.next(),
115 }
116 }
117
118 fn size_hint(&self) -> (usize, Option<usize>) {
119 match self {
120 EagerIterIf::OnDemand(i) => i.size_hint(),
121 EagerIterIf::Eager(i) => i.size_hint(),
122 }
123 }
124}