fqgrep_lib/
seq_io.rs

1/// Additions to `seq_io::parallel` for interleaved paired end FASTQ files.
2///
3/// Relies on: <https://github.com/markschl/seq_io/pull/23>.
4///
5/// Adds support to use the `seq_io::parallel` module to read interleaved paired end FASTQ files as
6/// well as read pairs across pairs of FASTQ files.
7///
8/// For interleaved paired end FASTQ files, a custom reader `InterleavedFastqReader` is required to
9/// read an even number of records, since consecutive pairs are assumed to be read pairs (mates).
10///
11/// For pairs across pairs of FASTQ files, a custom reader `PairsAcrossPairsReader` is required to
12/// synchronize records from the input files, ensuring the same number records are read from each
13/// file.
14use anyhow::Result;
15use itertools::{self, Itertools};
16use seq_io::fastq::{self, Record, RecordSet, RefRecord};
17use seq_io::parallel_record_impl;
18use serde::{Deserialize, Serialize};
19use std::io;
20
21parallel_record_impl!(
22    parallel_interleaved_fastq,
23    parallel_interleaved_fastq_init,
24    R,
25    InterleavedFastqReader<R>,
26    InterleavedRecordSet,
27    (fastq::RefRecord, fastq::RefRecord),
28    fastq::Error
29);
30
31/// Read that reads an interleaved paired end FASTQ file.
32pub struct InterleavedFastqReader<R: std::io::Read, P = seq_io::policy::StdPolicy> {
33    pub reader: fastq::Reader<R, P>,
34    rset_len: Option<usize>,
35}
36
37impl<R, P> InterleavedFastqReader<R, P>
38where
39    R: std::io::Read,
40    P: seq_io::policy::BufPolicy + Send,
41{
42    pub fn new(reader: fastq::Reader<R, P>) -> Self {
43        Self {
44            reader,
45            rset_len: None,
46        }
47    }
48}
49
50impl<R, P> seq_io::parallel::Reader for InterleavedFastqReader<R, P>
51where
52    R: std::io::Read,
53    P: seq_io::policy::BufPolicy + Send,
54{
55    type DataSet = InterleavedRecordSet;
56    type Err = fastq::Error;
57
58    fn fill_data(
59        &mut self,
60        record: &mut Self::DataSet,
61    ) -> Option<std::result::Result<(), Self::Err>> {
62        // self.rset_len will be None when fill_data is called for the first time, and from then
63        // on it will have a value.
64        let result = self
65            .reader
66            .read_record_set_exact(&mut record.set, self.rset_len);
67        if let Some(Ok(())) = result {
68            if self.rset_len.is_none() {
69                self.rset_len = Some(record.set.len());
70            }
71            if record.set.len() % 2 != 0 {
72                return Some(Err(fastq::Error::Io(std::io::Error::other(
73                    "FASTQ file does not have an even number of records.",
74                ))));
75            }
76        }
77        result
78    }
79}
80
81/// Thin wrapper around a ``RecordSet`` that contains an even number of records, with interleaved
82/// read spairs.
83#[derive(Default, Clone, Debug, Serialize, Deserialize)]
84pub struct InterleavedRecordSet {
85    set: RecordSet,
86}
87
88#[allow(clippy::into_iter_without_iter)]
89impl<'a> std::iter::IntoIterator for &'a InterleavedRecordSet {
90    type Item = (RefRecord<'a>, RefRecord<'a>);
91    type IntoIter = PairedRecordSetIterator<'a>;
92
93    #[inline]
94    fn into_iter(self) -> Self::IntoIter {
95        let iter = self.set.into_iter().tuples();
96        PairedRecordSetIterator {
97            iter: Box::new(iter),
98        }
99    }
100}
101
102impl InterleavedRecordSet {}
103
104parallel_record_impl!(
105    parallel_paired_fastq,
106    parallel_paired_fastq_init,
107    R,
108    PairedFastqReader<R>,
109    RecordSetTuple,
110    (fastq::RefRecord, fastq::RefRecord),
111    fastq::Error
112);
113
114/// Reader for paired ends that are spread across two FASTQ files.
115/// The number of records read from each file is synchronized, but the number is not known until
116/// after reading the first time, when we determine how many reads can fit into the fixed size
117/// buffer.  While the buffer may grow later for each `RecordSet` later (if later reads are
118/// longer), this is a good approximation given the buffer size.
119pub struct PairedFastqReader<R: std::io::Read, P = seq_io::policy::StdPolicy> {
120    reader1: fastq::Reader<R, P>,
121    reader2: fastq::Reader<R, P>,
122    rset_len: Option<usize>,
123}
124
125impl<R, P> PairedFastqReader<R, P>
126where
127    R: std::io::Read,
128    P: seq_io::policy::BufPolicy + Send,
129{
130    pub fn new(reader1: fastq::Reader<R, P>, reader2: fastq::Reader<R, P>) -> Self {
131        Self {
132            reader1,
133            reader2,
134            rset_len: None,
135        }
136    }
137}
138
139impl<R, P> seq_io::parallel::Reader for PairedFastqReader<R, P>
140where
141    R: std::io::Read,
142    P: seq_io::policy::BufPolicy + Send,
143{
144    type DataSet = RecordSetTuple;
145    type Err = fastq::Error;
146
147    fn fill_data(
148        &mut self,
149        record: &mut Self::DataSet,
150    ) -> Option<std::result::Result<(), Self::Err>> {
151        // self.rset_len will be None when fill_data is called for the first time, and from then
152        // on it will have a value.
153        let result1 = self
154            .reader1
155            .read_record_set_exact(&mut record.first, self.rset_len);
156        if result1.is_some() {
157            self.rset_len = Some(record.first.len());
158        }
159        let result2 = self
160            .reader2
161            .read_record_set_exact(&mut record.second, self.rset_len);
162
163        match (result1, result2) {
164            (None, None) => None,
165            (Some(Ok(())), Some(Ok(()))) => {
166                if record.first.len() == record.second.len() {
167                    Some(Ok(()))
168                } else {
169                    let head1: String = record.first.into_iter().last().map_or_else(
170                        || "No more records".to_string(),
171                        |r| String::from_utf8_lossy(r.head()).to_string(),
172                    );
173                    let head2 = record.second.into_iter().last().map_or_else(
174                        || "No more records".to_string(),
175                        |r| String::from_utf8_lossy(r.head()).to_string(),
176                    );
177                    Some(Err(fastq::Error::Io(std::io::Error::other(format!(
178                        "FASTQ files out of sync.  Last records:\n\t{head1}\n\t{head2}"
179                    )))))
180                }
181            }
182            (_, Some(Err(e))) | (Some(Err(e)), _) => Some(Err(e)),
183            (None, _) => Some(Err(fastq::Error::UnexpectedEnd {
184                pos: fastq::ErrorPosition {
185                    line: self.reader2.position().line(),
186                    id: None,
187                },
188            })),
189            (_, None) => Some(Err(fastq::Error::UnexpectedEnd {
190                pos: fastq::ErrorPosition {
191                    line: self.reader1.position().line(),
192                    id: None,
193                },
194            })),
195        }
196    }
197}
198
199/// Iterator over paired end reads from two FASTQ files.
200pub struct PairedRecordSetIterator<'a> {
201    iter: Box<dyn Iterator<Item = (RefRecord<'a>, RefRecord<'a>)> + 'a>,
202}
203
204impl<'a> Iterator for PairedRecordSetIterator<'a> {
205    type Item = (RefRecord<'a>, RefRecord<'a>);
206    fn next(&mut self) -> Option<Self::Item> {
207        self.iter.next()
208    }
209}
210
211/// Stores two record sets with the same number of records for paired end reads.  Each record set
212/// is used to read from the corresponding reader, one per end of a pair.
213#[derive(Default, Clone, Debug, Serialize, Deserialize)]
214pub struct RecordSetTuple {
215    first: RecordSet,
216    second: RecordSet,
217}
218
219#[allow(clippy::into_iter_without_iter)]
220impl<'a> std::iter::IntoIterator for &'a RecordSetTuple {
221    type Item = (RefRecord<'a>, RefRecord<'a>);
222    type IntoIter = PairedRecordSetIterator<'a>;
223
224    #[inline]
225    fn into_iter(self) -> Self::IntoIter {
226        #[allow(clippy::useless_conversion)]
227        let iter = self.first.into_iter().zip(self.second.into_iter());
228        PairedRecordSetIterator {
229            iter: Box::new(iter),
230        }
231    }
232}