Skip to main content

timsrust_utils/
reader.rs

1use rayon::iter::plumbing::{Producer, ProducerCallback, bridge};
2use rayon::prelude::*;
3
4pub trait Reader<T, A = usize> {
5    type Error: std::error::Error;
6    fn get(&self, accessor: A) -> Result<T, Self::Error>;
7}
8
9// pub trait ReadableFrom<R, A = usize>: Sized {
10//     type Error: std::error::Error;
11//     fn from_reader(reader: &R, accessor: A) -> Result<Self, Self::Error>;
12// }
13
14// impl<T, R> Reader<T> for R
15// where
16//     T: ReadableFrom<R>,
17// {
18//     type Error = <T as ReadableFrom<R>>::Error;
19
20//     fn get(&self, accessor: usize) -> Result<T, Self::Error> {
21//         T::from_reader(self, accessor)
22//     }
23// }
24
25pub trait IndexedReader<T, A = usize> {
26    type Iter: Iterator<Item = A>;
27    fn iter(&self) -> Self::Iter;
28}
29
30// pub trait IndexableFrom<R, A = usize>: Sized {
31//     type Iter: Iterator<Item = A>;
32//     fn iter_from_reader(reader: &R) -> Self::Iter;
33// }
34
35pub trait IterableReader<'a, T> {
36    type Error: std::error::Error;
37    fn iter(&'a self) -> impl Iterator<Item = Result<T, Self::Error>>;
38    fn iter_ok(&'a self) -> impl Iterator<Item = T> {
39        self.iter().filter_map(|res| res.ok())
40    }
41}
42
43pub trait ParIterableReader<'a, T>: Sync {
44    type Error: std::error::Error + Send;
45    fn par_iter(
46        &'a self,
47    ) -> impl ParallelIterator<Item = Result<T, Self::Error>>;
48    fn par_iter_ok(&'a self) -> impl ParallelIterator<Item = T>
49    where
50        T: Send,
51    {
52        self.par_iter().filter_map(|res| res.ok())
53    }
54}
55
56pub struct ReaderIter<'a, R, T>
57where
58    R: IndexedReader<T>,
59{
60    reader: &'a R,
61    iter: R::Iter,
62    _marker: std::marker::PhantomData<T>,
63}
64
65impl<'a, R, T> ReaderIter<'a, R, T>
66where
67    R: IndexedReader<T>,
68{
69    pub fn new(reader: &'a R) -> Self {
70        ReaderIter {
71            reader,
72            iter: reader.iter(),
73            _marker: std::marker::PhantomData,
74        }
75    }
76}
77
78impl<'a, R, T> Iterator for ReaderIter<'a, R, T>
79where
80    R: IndexedReader<T> + Reader<T>,
81{
82    type Item = Result<T, <R as Reader<T>>::Error>;
83
84    fn next(&mut self) -> Option<Self::Item> {
85        let index = self.iter.next()?;
86        Some(self.reader.get(index))
87    }
88
89    fn size_hint(&self) -> (usize, Option<usize>) {
90        self.iter.size_hint()
91    }
92}
93
94impl<'a, R, T> IterableReader<'a, T> for R
95where
96    R: IndexedReader<T> + Reader<T>,
97{
98    type Error = <R as Reader<T>>::Error;
99
100    fn iter(&'a self) -> impl Iterator<Item = Result<T, Self::Error>> {
101        ReaderIter::new(self)
102    }
103}
104
105pub struct ReaderParIter<'a, R, T> {
106    reader: &'a R,
107    indices: Vec<usize>,
108    _marker: std::marker::PhantomData<T>,
109}
110
111impl<'a, R, T> ReaderParIter<'a, R, T>
112where
113    R: IndexedReader<T>,
114{
115    pub fn new(reader: &'a R) -> Self {
116        ReaderParIter {
117            reader,
118            indices: reader.iter().collect(),
119            _marker: std::marker::PhantomData,
120        }
121    }
122}
123
124impl<'a, R, T> ParallelIterator for ReaderParIter<'a, R, T>
125where
126    R: IndexedReader<T> + Reader<T, Error: Send> + Sync,
127    T: Send,
128{
129    type Item = Result<T, <R as Reader<T>>::Error>;
130
131    fn drive_unindexed<C>(self, consumer: C) -> C::Result
132    where
133        C: rayon::iter::plumbing::UnindexedConsumer<Self::Item>,
134    {
135        bridge(self, consumer)
136    }
137}
138
139struct ReaderProducer<'a, R, T> {
140    reader: &'a R,
141    indices: Vec<usize>,
142    _marker: std::marker::PhantomData<T>,
143}
144
145struct ReaderProducerIter<'a, R, T> {
146    reader: &'a R,
147    indices: std::vec::IntoIter<usize>,
148    _marker: std::marker::PhantomData<T>,
149}
150
151impl<'a, R, T> Iterator for ReaderProducerIter<'a, R, T>
152where
153    R: Reader<T>,
154{
155    type Item = Result<T, <R as Reader<T>>::Error>;
156
157    fn next(&mut self) -> Option<Self::Item> {
158        self.indices.next().map(|i| self.reader.get(i))
159    }
160
161    fn size_hint(&self) -> (usize, Option<usize>) {
162        self.indices.size_hint()
163    }
164}
165
166impl<'a, R, T> DoubleEndedIterator for ReaderProducerIter<'a, R, T>
167where
168    R: Reader<T>,
169{
170    fn next_back(&mut self) -> Option<Self::Item> {
171        self.indices.next_back().map(|i| self.reader.get(i))
172    }
173}
174
175impl<'a, R, T> ExactSizeIterator for ReaderProducerIter<'a, R, T> where
176    R: Reader<T>
177{
178}
179
180impl<'a, R, T> Producer for ReaderProducer<'a, R, T>
181where
182    R: IndexedReader<T> + Reader<T, Error: Send> + Sync,
183    T: Send,
184{
185    type Item = Result<T, <R as Reader<T>>::Error>;
186    type IntoIter = ReaderProducerIter<'a, R, T>;
187
188    fn into_iter(self) -> Self::IntoIter {
189        ReaderProducerIter {
190            reader: self.reader,
191            indices: self.indices.into_iter(),
192            _marker: std::marker::PhantomData,
193        }
194    }
195
196    fn split_at(self, index: usize) -> (Self, Self) {
197        let right = self.indices[index..].to_vec();
198        let mut left = self.indices;
199        left.truncate(index);
200        (
201            ReaderProducer {
202                reader: self.reader,
203                indices: left,
204                _marker: std::marker::PhantomData,
205            },
206            ReaderProducer {
207                reader: self.reader,
208                indices: right,
209                _marker: std::marker::PhantomData,
210            },
211        )
212    }
213}
214
215impl<'a, R, T> IndexedParallelIterator for ReaderParIter<'a, R, T>
216where
217    R: IndexedReader<T> + Reader<T> + Sync,
218    T: Send,
219    <R as Reader<T>>::Error: Send,
220{
221    fn drive<C>(self, consumer: C) -> C::Result
222    where
223        C: rayon::iter::plumbing::Consumer<Self::Item>,
224    {
225        bridge(self, consumer)
226    }
227
228    fn len(&self) -> usize {
229        self.indices.len()
230    }
231
232    fn with_producer<CB>(self, callback: CB) -> CB::Output
233    where
234        CB: ProducerCallback<Self::Item>,
235    {
236        callback.callback(ReaderProducer {
237            reader: self.reader,
238            indices: self.indices,
239            _marker: std::marker::PhantomData,
240        })
241    }
242}
243
244impl<'a, R, T> ParIterableReader<'a, T> for R
245where
246    R: IndexedReader<T> + Reader<T> + Sync,
247    T: Send,
248    <R as Reader<T>>::Error: Send,
249{
250    type Error = <R as Reader<T>>::Error;
251
252    fn par_iter(
253        &'a self,
254    ) -> impl ParallelIterator<Item = Result<T, Self::Error>> {
255        ReaderParIter::new(self)
256    }
257}