1use anyhow::Result;
15use itertools::{self, Itertools};
16use seq_io::fastq::{self, Record, RecordSet, RefRecord};
17use seq_io::parallel::{ReusableReader, read_parallel_init};
18use seq_io::parallel_record_impl;
19use serde::{Deserialize, Serialize};
20use std::collections::BTreeMap;
21use std::io;
22
23parallel_record_impl!(
24 parallel_interleaved_fastq,
25 parallel_interleaved_fastq_init,
26 R,
27 InterleavedFastqReader<R>,
28 InterleavedRecordSet,
29 (fastq::RefRecord, fastq::RefRecord),
30 fastq::Error
31);
32
33pub struct InterleavedFastqReader<R: std::io::Read, P = seq_io::policy::StdPolicy> {
35 pub reader: fastq::Reader<R, P>,
36 rset_len: Option<usize>,
37}
38
39impl<R, P> InterleavedFastqReader<R, P>
40where
41 R: std::io::Read,
42 P: seq_io::policy::BufPolicy + Send,
43{
44 pub fn new(reader: fastq::Reader<R, P>) -> Self {
45 Self {
46 reader,
47 rset_len: None,
48 }
49 }
50}
51
52impl<R, P> seq_io::parallel::Reader for InterleavedFastqReader<R, P>
53where
54 R: std::io::Read,
55 P: seq_io::policy::BufPolicy + Send,
56{
57 type DataSet = InterleavedRecordSet;
58 type Err = fastq::Error;
59
60 fn fill_data(
61 &mut self,
62 record: &mut Self::DataSet,
63 ) -> Option<std::result::Result<(), Self::Err>> {
64 let result = self
67 .reader
68 .read_record_set_exact(&mut record.set, self.rset_len);
69 if let Some(Ok(())) = result {
70 if self.rset_len.is_none() {
71 self.rset_len = Some(record.set.len());
72 }
73 if record.set.len() % 2 != 0 {
74 return Some(Err(fastq::Error::Io(std::io::Error::other(
75 "FASTQ file does not have an even number of records.",
76 ))));
77 }
78 }
79 result
80 }
81}
82
83#[derive(Default, Clone, Debug, Serialize, Deserialize)]
86pub struct InterleavedRecordSet {
87 set: RecordSet,
88}
89
90#[allow(clippy::into_iter_without_iter)]
91impl<'a> std::iter::IntoIterator for &'a InterleavedRecordSet {
92 type Item = (RefRecord<'a>, RefRecord<'a>);
93 type IntoIter = PairedRecordSetIterator<'a>;
94
95 #[inline]
96 fn into_iter(self) -> Self::IntoIter {
97 let iter = self.set.into_iter().tuples();
98 PairedRecordSetIterator {
99 iter: Box::new(iter),
100 }
101 }
102}
103
104impl InterleavedRecordSet {}
105
106parallel_record_impl!(
107 parallel_paired_fastq,
108 parallel_paired_fastq_init,
109 R,
110 PairedFastqReader<R>,
111 RecordSetTuple,
112 (fastq::RefRecord, fastq::RefRecord),
113 fastq::Error
114);
115
116pub struct PairedFastqReader<R: std::io::Read, P = seq_io::policy::StdPolicy> {
122 reader1: fastq::Reader<R, P>,
123 reader2: fastq::Reader<R, P>,
124 rset_len: Option<usize>,
125}
126
127impl<R, P> PairedFastqReader<R, P>
128where
129 R: std::io::Read,
130 P: seq_io::policy::BufPolicy + Send,
131{
132 pub fn new(reader1: fastq::Reader<R, P>, reader2: fastq::Reader<R, P>) -> Self {
133 Self {
134 reader1,
135 reader2,
136 rset_len: None,
137 }
138 }
139}
140
141impl<R, P> seq_io::parallel::Reader for PairedFastqReader<R, P>
142where
143 R: std::io::Read,
144 P: seq_io::policy::BufPolicy + Send,
145{
146 type DataSet = RecordSetTuple;
147 type Err = fastq::Error;
148
149 fn fill_data(
150 &mut self,
151 record: &mut Self::DataSet,
152 ) -> Option<std::result::Result<(), Self::Err>> {
153 let result1 = self
156 .reader1
157 .read_record_set_exact(&mut record.first, self.rset_len);
158 if result1.is_some() {
159 self.rset_len = Some(record.first.len());
160 }
161 let result2 = self
162 .reader2
163 .read_record_set_exact(&mut record.second, self.rset_len);
164
165 match (result1, result2) {
166 (None, None) => None,
167 (Some(Ok(())), Some(Ok(()))) => {
168 if record.first.len() == record.second.len() {
169 Some(Ok(()))
170 } else {
171 let head1: String = record.first.into_iter().last().map_or_else(
172 || "No more records".to_string(),
173 |r| String::from_utf8_lossy(r.head()).to_string(),
174 );
175 let head2 = record.second.into_iter().last().map_or_else(
176 || "No more records".to_string(),
177 |r| String::from_utf8_lossy(r.head()).to_string(),
178 );
179 Some(Err(fastq::Error::Io(std::io::Error::other(format!(
180 "FASTQ files out of sync. Last records:\n\t{head1}\n\t{head2}"
181 )))))
182 }
183 }
184 (_, Some(Err(e))) | (Some(Err(e)), _) => Some(Err(e)),
185 (None, _) => Some(Err(fastq::Error::UnexpectedEnd {
186 pos: fastq::ErrorPosition {
187 line: self.reader2.position().line(),
188 id: None,
189 },
190 })),
191 (_, None) => Some(Err(fastq::Error::UnexpectedEnd {
192 pos: fastq::ErrorPosition {
193 line: self.reader1.position().line(),
194 id: None,
195 },
196 })),
197 }
198 }
199}
200
201pub struct PairedRecordSetIterator<'a> {
203 iter: Box<dyn Iterator<Item = (RefRecord<'a>, RefRecord<'a>)> + 'a>,
204}
205
206impl<'a> Iterator for PairedRecordSetIterator<'a> {
207 type Item = (RefRecord<'a>, RefRecord<'a>);
208 fn next(&mut self) -> Option<Self::Item> {
209 self.iter.next()
210 }
211}
212
213#[derive(Default, Clone, Debug, Serialize, Deserialize)]
216pub struct RecordSetTuple {
217 first: RecordSet,
218 second: RecordSet,
219}
220
221#[allow(clippy::into_iter_without_iter)]
222impl<'a> std::iter::IntoIterator for &'a RecordSetTuple {
223 type Item = (RefRecord<'a>, RefRecord<'a>);
224 type IntoIter = PairedRecordSetIterator<'a>;
225
226 #[inline]
227 fn into_iter(self) -> Self::IntoIter {
228 #[allow(clippy::useless_conversion)]
229 let iter = self.first.into_iter().zip(self.second.into_iter());
230 PairedRecordSetIterator {
231 iter: Box::new(iter),
232 }
233 }
234}
235
236pub(crate) struct OrderedDataSet<D> {
238 inner: D,
239 seq: u64,
240}
241
242impl<D: Default> Default for OrderedDataSet<D> {
243 fn default() -> Self {
244 Self {
245 inner: D::default(),
246 seq: 0,
247 }
248 }
249}
250
251pub(crate) struct OrderedReader<R: seq_io::parallel::Reader> {
254 inner: R,
255 next_seq: u64,
256}
257
258impl<R: seq_io::parallel::Reader> OrderedReader<R> {
259 pub fn new(inner: R) -> Self {
260 Self { inner, next_seq: 0 }
261 }
262}
263
264impl<R: seq_io::parallel::Reader> seq_io::parallel::Reader for OrderedReader<R>
265where
266 R::DataSet: Send,
267{
268 type DataSet = OrderedDataSet<R::DataSet>;
269 type Err = R::Err;
270
271 fn fill_data(
272 &mut self,
273 data: &mut Self::DataSet,
274 ) -> Option<std::result::Result<(), Self::Err>> {
275 let result = self.inner.fill_data(&mut data.inner);
276 if let Some(Ok(())) = &result {
277 data.seq = self.next_seq;
278 self.next_seq += 1;
279 }
280 result
281 }
282}
283
284macro_rules! ordered_parallel_record_impl {
288 ($name:ident, $reader:ty, $dataset:ty, $record:ty, $err:ty) => {
289 pub fn $name<R, D, W, F, Out>(
293 reader: $reader,
294 n_threads: u32,
295 queue_len: usize,
296 work: W,
297 mut func: F,
298 ) -> std::result::Result<Option<Out>, $err>
299 where
300 R: io::Read + Send,
301 D: Default + Send,
302 W: Send + Sync + Fn($record, &mut D),
303 F: FnMut($record, &mut D) -> Option<Out>,
304 {
305 read_parallel_init::<_, $err, _, _, _, _, $err, _, _, _>(
306 n_threads,
307 queue_len,
308 move || {
309 Ok::<_, $err>(ReusableReader::<OrderedReader<$reader>, (Vec<D>, ())>::new(
310 OrderedReader::new(reader),
311 ))
312 },
313 || Ok::<_, $err>((OrderedDataSet::<$dataset>::default(), (Vec::<D>::new(), ()))),
314 |data: &mut (OrderedDataSet<$dataset>, (Vec<D>, ()))| {
315 let recordset = &data.0.inner;
316 let out = &mut (data.1).0;
317 let mut record_iter = recordset.into_iter();
318 for (d, record) in out.iter_mut().zip(&mut record_iter) {
319 work(record, d);
320 }
321 for record in record_iter {
322 out.push(D::default());
323 work(record, out.last_mut().unwrap());
324 }
325 Ok::<_, $err>(())
326 },
327 |records| {
328 let mut next_seq: u64 = 0;
329 let mut buffer: BTreeMap<u64, ($dataset, Vec<D>)> = BTreeMap::new();
330
331 while let Some(result) = records.next() {
332 let (data, work_result) = result?;
333 work_result?;
334 let seq = data.0.seq;
335
336 if seq == next_seq {
337 for (record, d) in
338 (&data.0.inner).into_iter().zip((data.1).0.iter_mut())
339 {
340 if let Some(out) = func(record, d) {
341 return Ok(Some(out));
342 }
343 }
344 next_seq += 1;
345 while let Some((rset, mut outs)) = buffer.remove(&next_seq) {
346 for (record, d) in (&rset).into_iter().zip(outs.iter_mut()) {
347 if let Some(out) = func(record, d) {
348 return Ok(Some(out));
349 }
350 }
351 next_seq += 1;
352 }
353 } else {
354 buffer.insert(
355 seq,
356 (
357 std::mem::take(&mut data.0.inner),
358 std::mem::take(&mut (data.1).0),
359 ),
360 );
361 }
362 }
363 Ok(None)
364 },
365 )?
366 }
367 };
368}
369
370ordered_parallel_record_impl!(
371 ordered_parallel_fastq,
372 fastq::Reader<R>,
373 RecordSet,
374 fastq::RefRecord,
375 fastq::Error
376);
377
378ordered_parallel_record_impl!(
379 ordered_parallel_interleaved_fastq,
380 InterleavedFastqReader<R>,
381 InterleavedRecordSet,
382 (fastq::RefRecord, fastq::RefRecord),
383 fastq::Error
384);
385
386ordered_parallel_record_impl!(
387 ordered_parallel_paired_fastq,
388 PairedFastqReader<R>,
389 RecordSetTuple,
390 (fastq::RefRecord, fastq::RefRecord),
391 fastq::Error
392);