Skip to main content

fqgrep_lib/
seq_io.rs

1/// Additions to `seq_io::parallel` for interleaved paired end FASTQ files.
2///
3/// Relies on: <https://github.com/markschl/seq_io/pull/23>.
4///
5/// Adds support to use the `seq_io::parallel` module to read interleaved paired end FASTQ files as
6/// well as read pairs across pairs of FASTQ files.
7///
8/// For interleaved paired end FASTQ files, a custom reader `InterleavedFastqReader` is required to
9/// read an even number of records, since consecutive pairs are assumed to be read pairs (mates).
10///
11/// For pairs across pairs of FASTQ files, a custom reader `PairsAcrossPairsReader` is required to
12/// synchronize records from the input files, ensuring the same number records are read from each
13/// file.
14use anyhow::Result;
15use itertools::{self, Itertools};
16use seq_io::fastq::{self, Record, RecordSet, RefRecord};
17use seq_io::parallel::{ReusableReader, read_parallel_init};
18use seq_io::parallel_record_impl;
19use serde::{Deserialize, Serialize};
20use std::collections::BTreeMap;
21use std::io;
22
23parallel_record_impl!(
24    parallel_interleaved_fastq,
25    parallel_interleaved_fastq_init,
26    R,
27    InterleavedFastqReader<R>,
28    InterleavedRecordSet,
29    (fastq::RefRecord, fastq::RefRecord),
30    fastq::Error
31);
32
33/// Read that reads an interleaved paired end FASTQ file.
34pub struct InterleavedFastqReader<R: std::io::Read, P = seq_io::policy::StdPolicy> {
35    pub reader: fastq::Reader<R, P>,
36    rset_len: Option<usize>,
37}
38
39impl<R, P> InterleavedFastqReader<R, P>
40where
41    R: std::io::Read,
42    P: seq_io::policy::BufPolicy + Send,
43{
44    pub fn new(reader: fastq::Reader<R, P>) -> Self {
45        Self {
46            reader,
47            rset_len: None,
48        }
49    }
50}
51
52impl<R, P> seq_io::parallel::Reader for InterleavedFastqReader<R, P>
53where
54    R: std::io::Read,
55    P: seq_io::policy::BufPolicy + Send,
56{
57    type DataSet = InterleavedRecordSet;
58    type Err = fastq::Error;
59
60    fn fill_data(
61        &mut self,
62        record: &mut Self::DataSet,
63    ) -> Option<std::result::Result<(), Self::Err>> {
64        // self.rset_len will be None when fill_data is called for the first time, and from then
65        // on it will have a value.
66        let result = self
67            .reader
68            .read_record_set_exact(&mut record.set, self.rset_len);
69        if let Some(Ok(())) = result {
70            if self.rset_len.is_none() {
71                self.rset_len = Some(record.set.len());
72            }
73            if record.set.len() % 2 != 0 {
74                return Some(Err(fastq::Error::Io(std::io::Error::other(
75                    "FASTQ file does not have an even number of records.",
76                ))));
77            }
78        }
79        result
80    }
81}
82
83/// Thin wrapper around a ``RecordSet`` that contains an even number of records, with interleaved
84/// read pairs.
85#[derive(Default, Clone, Debug, Serialize, Deserialize)]
86pub struct InterleavedRecordSet {
87    set: RecordSet,
88}
89
90#[allow(clippy::into_iter_without_iter)]
91impl<'a> std::iter::IntoIterator for &'a InterleavedRecordSet {
92    type Item = (RefRecord<'a>, RefRecord<'a>);
93    type IntoIter = PairedRecordSetIterator<'a>;
94
95    #[inline]
96    fn into_iter(self) -> Self::IntoIter {
97        let iter = self.set.into_iter().tuples();
98        PairedRecordSetIterator {
99            iter: Box::new(iter),
100        }
101    }
102}
103
104impl InterleavedRecordSet {}
105
106parallel_record_impl!(
107    parallel_paired_fastq,
108    parallel_paired_fastq_init,
109    R,
110    PairedFastqReader<R>,
111    RecordSetTuple,
112    (fastq::RefRecord, fastq::RefRecord),
113    fastq::Error
114);
115
116/// Reader for paired ends that are spread across two FASTQ files.
117/// The number of records read from each file is synchronized, but the number is not known until
118/// after reading the first time, when we determine how many reads can fit into the fixed size
119/// buffer.  While the buffer may grow later for each `RecordSet` later (if later reads are
120/// longer), this is a good approximation given the buffer size.
121pub struct PairedFastqReader<R: std::io::Read, P = seq_io::policy::StdPolicy> {
122    reader1: fastq::Reader<R, P>,
123    reader2: fastq::Reader<R, P>,
124    rset_len: Option<usize>,
125}
126
127impl<R, P> PairedFastqReader<R, P>
128where
129    R: std::io::Read,
130    P: seq_io::policy::BufPolicy + Send,
131{
132    pub fn new(reader1: fastq::Reader<R, P>, reader2: fastq::Reader<R, P>) -> Self {
133        Self {
134            reader1,
135            reader2,
136            rset_len: None,
137        }
138    }
139}
140
141impl<R, P> seq_io::parallel::Reader for PairedFastqReader<R, P>
142where
143    R: std::io::Read,
144    P: seq_io::policy::BufPolicy + Send,
145{
146    type DataSet = RecordSetTuple;
147    type Err = fastq::Error;
148
149    fn fill_data(
150        &mut self,
151        record: &mut Self::DataSet,
152    ) -> Option<std::result::Result<(), Self::Err>> {
153        // self.rset_len will be None when fill_data is called for the first time, and from then
154        // on it will have a value.
155        let result1 = self
156            .reader1
157            .read_record_set_exact(&mut record.first, self.rset_len);
158        if result1.is_some() {
159            self.rset_len = Some(record.first.len());
160        }
161        let result2 = self
162            .reader2
163            .read_record_set_exact(&mut record.second, self.rset_len);
164
165        match (result1, result2) {
166            (None, None) => None,
167            (Some(Ok(())), Some(Ok(()))) => {
168                if record.first.len() == record.second.len() {
169                    Some(Ok(()))
170                } else {
171                    let head1: String = record.first.into_iter().last().map_or_else(
172                        || "No more records".to_string(),
173                        |r| String::from_utf8_lossy(r.head()).to_string(),
174                    );
175                    let head2 = record.second.into_iter().last().map_or_else(
176                        || "No more records".to_string(),
177                        |r| String::from_utf8_lossy(r.head()).to_string(),
178                    );
179                    Some(Err(fastq::Error::Io(std::io::Error::other(format!(
180                        "FASTQ files out of sync.  Last records:\n\t{head1}\n\t{head2}"
181                    )))))
182                }
183            }
184            (_, Some(Err(e))) | (Some(Err(e)), _) => Some(Err(e)),
185            (None, _) => Some(Err(fastq::Error::UnexpectedEnd {
186                pos: fastq::ErrorPosition {
187                    line: self.reader2.position().line(),
188                    id: None,
189                },
190            })),
191            (_, None) => Some(Err(fastq::Error::UnexpectedEnd {
192                pos: fastq::ErrorPosition {
193                    line: self.reader1.position().line(),
194                    id: None,
195                },
196            })),
197        }
198    }
199}
200
201/// Iterator over paired end reads from two FASTQ files.
202pub struct PairedRecordSetIterator<'a> {
203    iter: Box<dyn Iterator<Item = (RefRecord<'a>, RefRecord<'a>)> + 'a>,
204}
205
206impl<'a> Iterator for PairedRecordSetIterator<'a> {
207    type Item = (RefRecord<'a>, RefRecord<'a>);
208    fn next(&mut self) -> Option<Self::Item> {
209        self.iter.next()
210    }
211}
212
213/// Stores two record sets with the same number of records for paired end reads.  Each record set
214/// is used to read from the corresponding reader, one per end of a pair.
215#[derive(Default, Clone, Debug, Serialize, Deserialize)]
216pub struct RecordSetTuple {
217    first: RecordSet,
218    second: RecordSet,
219}
220
221#[allow(clippy::into_iter_without_iter)]
222impl<'a> std::iter::IntoIterator for &'a RecordSetTuple {
223    type Item = (RefRecord<'a>, RefRecord<'a>);
224    type IntoIter = PairedRecordSetIterator<'a>;
225
226    #[inline]
227    fn into_iter(self) -> Self::IntoIter {
228        #[allow(clippy::useless_conversion)]
229        let iter = self.first.into_iter().zip(self.second.into_iter());
230        PairedRecordSetIterator {
231            iter: Box::new(iter),
232        }
233    }
234}
235
236/// A data set wrapper that carries a dispatch sequence number for ordered output.
237pub(crate) struct OrderedDataSet<D> {
238    inner: D,
239    seq: u64,
240}
241
242impl<D: Default> Default for OrderedDataSet<D> {
243    fn default() -> Self {
244        Self {
245            inner: D::default(),
246            seq: 0,
247        }
248    }
249}
250
251/// Wraps a reader to assign monotonically increasing sequence numbers to each batch.
252/// Sequence numbers are assigned in `fill_data`, which runs sequentially in the reader thread.
253pub(crate) struct OrderedReader<R: seq_io::parallel::Reader> {
254    inner: R,
255    next_seq: u64,
256}
257
258impl<R: seq_io::parallel::Reader> OrderedReader<R> {
259    pub fn new(inner: R) -> Self {
260        Self { inner, next_seq: 0 }
261    }
262}
263
264impl<R: seq_io::parallel::Reader> seq_io::parallel::Reader for OrderedReader<R>
265where
266    R::DataSet: Send,
267{
268    type DataSet = OrderedDataSet<R::DataSet>;
269    type Err = R::Err;
270
271    fn fill_data(
272        &mut self,
273        data: &mut Self::DataSet,
274    ) -> Option<std::result::Result<(), Self::Err>> {
275        let result = self.inner.fill_data(&mut data.inner);
276        if let Some(Ok(())) = &result {
277            data.seq = self.next_seq;
278            self.next_seq += 1;
279        }
280        result
281    }
282}
283
284/// Generates an ordered parallel processing function that preserves input order in output.
285/// Uses `OrderedReader` to tag batches with sequence numbers and a `BTreeMap` reorder buffer
286/// to yield results in dispatch order rather than completion order.
287macro_rules! ordered_parallel_record_impl {
288    ($name:ident, $reader:ty, $dataset:ty, $record:ty, $err:ty) => {
289        /// Processes records in parallel while preserving input order in the output.
290        /// When a batch arrives out of order, its record set and per-record data are moved
291        /// into a reorder buffer via `mem::take` and yielded later in the correct sequence.
292        pub fn $name<R, D, W, F, Out>(
293            reader: $reader,
294            n_threads: u32,
295            queue_len: usize,
296            work: W,
297            mut func: F,
298        ) -> std::result::Result<Option<Out>, $err>
299        where
300            R: io::Read + Send,
301            D: Default + Send,
302            W: Send + Sync + Fn($record, &mut D),
303            F: FnMut($record, &mut D) -> Option<Out>,
304        {
305            read_parallel_init::<_, $err, _, _, _, _, $err, _, _, _>(
306                n_threads,
307                queue_len,
308                move || {
309                    Ok::<_, $err>(ReusableReader::<OrderedReader<$reader>, (Vec<D>, ())>::new(
310                        OrderedReader::new(reader),
311                    ))
312                },
313                || Ok::<_, $err>((OrderedDataSet::<$dataset>::default(), (Vec::<D>::new(), ()))),
314                |data: &mut (OrderedDataSet<$dataset>, (Vec<D>, ()))| {
315                    let recordset = &data.0.inner;
316                    let out = &mut (data.1).0;
317                    let mut record_iter = recordset.into_iter();
318                    for (d, record) in out.iter_mut().zip(&mut record_iter) {
319                        work(record, d);
320                    }
321                    for record in record_iter {
322                        out.push(D::default());
323                        work(record, out.last_mut().unwrap());
324                    }
325                    Ok::<_, $err>(())
326                },
327                |records| {
328                    let mut next_seq: u64 = 0;
329                    let mut buffer: BTreeMap<u64, ($dataset, Vec<D>)> = BTreeMap::new();
330
331                    while let Some(result) = records.next() {
332                        let (data, work_result) = result?;
333                        work_result?;
334                        let seq = data.0.seq;
335
336                        if seq == next_seq {
337                            for (record, d) in
338                                (&data.0.inner).into_iter().zip((data.1).0.iter_mut())
339                            {
340                                if let Some(out) = func(record, d) {
341                                    return Ok(Some(out));
342                                }
343                            }
344                            next_seq += 1;
345                            while let Some((rset, mut outs)) = buffer.remove(&next_seq) {
346                                for (record, d) in (&rset).into_iter().zip(outs.iter_mut()) {
347                                    if let Some(out) = func(record, d) {
348                                        return Ok(Some(out));
349                                    }
350                                }
351                                next_seq += 1;
352                            }
353                        } else {
354                            buffer.insert(
355                                seq,
356                                (
357                                    std::mem::take(&mut data.0.inner),
358                                    std::mem::take(&mut (data.1).0),
359                                ),
360                            );
361                        }
362                    }
363                    Ok(None)
364                },
365            )?
366        }
367    };
368}
369
370ordered_parallel_record_impl!(
371    ordered_parallel_fastq,
372    fastq::Reader<R>,
373    RecordSet,
374    fastq::RefRecord,
375    fastq::Error
376);
377
378ordered_parallel_record_impl!(
379    ordered_parallel_interleaved_fastq,
380    InterleavedFastqReader<R>,
381    InterleavedRecordSet,
382    (fastq::RefRecord, fastq::RefRecord),
383    fastq::Error
384);
385
386ordered_parallel_record_impl!(
387    ordered_parallel_paired_fastq,
388    PairedFastqReader<R>,
389    RecordSetTuple,
390    (fastq::RefRecord, fastq::RefRecord),
391    fastq::Error
392);