1use rayon::iter::plumbing::{Producer, ProducerCallback, bridge};
2use rayon::prelude::*;
3
4pub trait Reader<T, A = usize> {
5 type Error: std::error::Error;
6 fn get(&self, accessor: A) -> Result<T, Self::Error>;
7}
8
9pub trait IndexedReader<T, A = usize> {
26 type Iter: Iterator<Item = A>;
27 fn iter(&self) -> Self::Iter;
28}
29
30pub trait IterableReader<'a, T> {
36 type Error: std::error::Error;
37 fn iter(&'a self) -> impl Iterator<Item = Result<T, Self::Error>>;
38 fn iter_ok(&'a self) -> impl Iterator<Item = T> {
39 self.iter().filter_map(|res| res.ok())
40 }
41}
42
43pub trait ParIterableReader<'a, T>: Sync {
44 type Error: std::error::Error + Send;
45 fn par_iter(
46 &'a self,
47 ) -> impl ParallelIterator<Item = Result<T, Self::Error>>;
48 fn par_iter_ok(&'a self) -> impl ParallelIterator<Item = T>
49 where
50 T: Send,
51 {
52 self.par_iter().filter_map(|res| res.ok())
53 }
54}
55
56pub struct ReaderIter<'a, R, T>
57where
58 R: IndexedReader<T>,
59{
60 reader: &'a R,
61 iter: R::Iter,
62 _marker: std::marker::PhantomData<T>,
63}
64
65impl<'a, R, T> ReaderIter<'a, R, T>
66where
67 R: IndexedReader<T>,
68{
69 pub fn new(reader: &'a R) -> Self {
70 ReaderIter {
71 reader,
72 iter: reader.iter(),
73 _marker: std::marker::PhantomData,
74 }
75 }
76}
77
78impl<'a, R, T> Iterator for ReaderIter<'a, R, T>
79where
80 R: IndexedReader<T> + Reader<T>,
81{
82 type Item = Result<T, <R as Reader<T>>::Error>;
83
84 fn next(&mut self) -> Option<Self::Item> {
85 let index = self.iter.next()?;
86 Some(self.reader.get(index))
87 }
88
89 fn size_hint(&self) -> (usize, Option<usize>) {
90 self.iter.size_hint()
91 }
92}
93
94impl<'a, R, T> IterableReader<'a, T> for R
95where
96 R: IndexedReader<T> + Reader<T>,
97{
98 type Error = <R as Reader<T>>::Error;
99
100 fn iter(&'a self) -> impl Iterator<Item = Result<T, Self::Error>> {
101 ReaderIter::new(self)
102 }
103}
104
105pub struct ReaderParIter<'a, R, T> {
106 reader: &'a R,
107 indices: Vec<usize>,
108 _marker: std::marker::PhantomData<T>,
109}
110
111impl<'a, R, T> ReaderParIter<'a, R, T>
112where
113 R: IndexedReader<T>,
114{
115 pub fn new(reader: &'a R) -> Self {
116 ReaderParIter {
117 reader,
118 indices: reader.iter().collect(),
119 _marker: std::marker::PhantomData,
120 }
121 }
122}
123
124impl<'a, R, T> ParallelIterator for ReaderParIter<'a, R, T>
125where
126 R: IndexedReader<T> + Reader<T, Error: Send> + Sync,
127 T: Send,
128{
129 type Item = Result<T, <R as Reader<T>>::Error>;
130
131 fn drive_unindexed<C>(self, consumer: C) -> C::Result
132 where
133 C: rayon::iter::plumbing::UnindexedConsumer<Self::Item>,
134 {
135 bridge(self, consumer)
136 }
137}
138
139struct ReaderProducer<'a, R, T> {
140 reader: &'a R,
141 indices: Vec<usize>,
142 _marker: std::marker::PhantomData<T>,
143}
144
145struct ReaderProducerIter<'a, R, T> {
146 reader: &'a R,
147 indices: std::vec::IntoIter<usize>,
148 _marker: std::marker::PhantomData<T>,
149}
150
151impl<'a, R, T> Iterator for ReaderProducerIter<'a, R, T>
152where
153 R: Reader<T>,
154{
155 type Item = Result<T, <R as Reader<T>>::Error>;
156
157 fn next(&mut self) -> Option<Self::Item> {
158 self.indices.next().map(|i| self.reader.get(i))
159 }
160
161 fn size_hint(&self) -> (usize, Option<usize>) {
162 self.indices.size_hint()
163 }
164}
165
166impl<'a, R, T> DoubleEndedIterator for ReaderProducerIter<'a, R, T>
167where
168 R: Reader<T>,
169{
170 fn next_back(&mut self) -> Option<Self::Item> {
171 self.indices.next_back().map(|i| self.reader.get(i))
172 }
173}
174
175impl<'a, R, T> ExactSizeIterator for ReaderProducerIter<'a, R, T> where
176 R: Reader<T>
177{
178}
179
180impl<'a, R, T> Producer for ReaderProducer<'a, R, T>
181where
182 R: IndexedReader<T> + Reader<T, Error: Send> + Sync,
183 T: Send,
184{
185 type Item = Result<T, <R as Reader<T>>::Error>;
186 type IntoIter = ReaderProducerIter<'a, R, T>;
187
188 fn into_iter(self) -> Self::IntoIter {
189 ReaderProducerIter {
190 reader: self.reader,
191 indices: self.indices.into_iter(),
192 _marker: std::marker::PhantomData,
193 }
194 }
195
196 fn split_at(self, index: usize) -> (Self, Self) {
197 let right = self.indices[index..].to_vec();
198 let mut left = self.indices;
199 left.truncate(index);
200 (
201 ReaderProducer {
202 reader: self.reader,
203 indices: left,
204 _marker: std::marker::PhantomData,
205 },
206 ReaderProducer {
207 reader: self.reader,
208 indices: right,
209 _marker: std::marker::PhantomData,
210 },
211 )
212 }
213}
214
215impl<'a, R, T> IndexedParallelIterator for ReaderParIter<'a, R, T>
216where
217 R: IndexedReader<T> + Reader<T> + Sync,
218 T: Send,
219 <R as Reader<T>>::Error: Send,
220{
221 fn drive<C>(self, consumer: C) -> C::Result
222 where
223 C: rayon::iter::plumbing::Consumer<Self::Item>,
224 {
225 bridge(self, consumer)
226 }
227
228 fn len(&self) -> usize {
229 self.indices.len()
230 }
231
232 fn with_producer<CB>(self, callback: CB) -> CB::Output
233 where
234 CB: ProducerCallback<Self::Item>,
235 {
236 callback.callback(ReaderProducer {
237 reader: self.reader,
238 indices: self.indices,
239 _marker: std::marker::PhantomData,
240 })
241 }
242}
243
244impl<'a, R, T> ParIterableReader<'a, T> for R
245where
246 R: IndexedReader<T> + Reader<T> + Sync,
247 T: Send,
248 <R as Reader<T>>::Error: Send,
249{
250 type Error = <R as Reader<T>>::Error;
251
252 fn par_iter(
253 &'a self,
254 ) -> impl ParallelIterator<Item = Result<T, Self::Error>> {
255 ReaderParIter::new(self)
256 }
257}