1use anyhow::Result;
15use itertools::{self, Itertools};
16use seq_io::fastq::{self, Record, RecordSet, RefRecord};
17use seq_io::parallel_record_impl;
18use serde::{Deserialize, Serialize};
19use std::io;
20
21parallel_record_impl!(
22 parallel_interleaved_fastq,
23 parallel_interleaved_fastq_init,
24 R,
25 InterleavedFastqReader<R>,
26 InterleavedRecordSet,
27 (fastq::RefRecord, fastq::RefRecord),
28 fastq::Error
29);
30
31pub struct InterleavedFastqReader<R: std::io::Read, P = seq_io::policy::StdPolicy> {
33 pub reader: fastq::Reader<R, P>,
34 rset_len: Option<usize>,
35}
36
37impl<R, P> InterleavedFastqReader<R, P>
38where
39 R: std::io::Read,
40 P: seq_io::policy::BufPolicy + Send,
41{
42 pub fn new(reader: fastq::Reader<R, P>) -> Self {
43 Self {
44 reader,
45 rset_len: None,
46 }
47 }
48}
49
50impl<R, P> seq_io::parallel::Reader for InterleavedFastqReader<R, P>
51where
52 R: std::io::Read,
53 P: seq_io::policy::BufPolicy + Send,
54{
55 type DataSet = InterleavedRecordSet;
56 type Err = fastq::Error;
57
58 fn fill_data(
59 &mut self,
60 record: &mut Self::DataSet,
61 ) -> Option<std::result::Result<(), Self::Err>> {
62 let result = self
65 .reader
66 .read_record_set_exact(&mut record.set, self.rset_len);
67 if let Some(Ok(())) = result {
68 if self.rset_len.is_none() {
69 self.rset_len = Some(record.set.len());
70 }
71 if record.set.len() % 2 != 0 {
72 return Some(Err(fastq::Error::Io(std::io::Error::other(
73 "FASTQ file does not have an even number of records.",
74 ))));
75 }
76 }
77 result
78 }
79}
80
81#[derive(Default, Clone, Debug, Serialize, Deserialize)]
84pub struct InterleavedRecordSet {
85 set: RecordSet,
86}
87
88#[allow(clippy::into_iter_without_iter)]
89impl<'a> std::iter::IntoIterator for &'a InterleavedRecordSet {
90 type Item = (RefRecord<'a>, RefRecord<'a>);
91 type IntoIter = PairedRecordSetIterator<'a>;
92
93 #[inline]
94 fn into_iter(self) -> Self::IntoIter {
95 let iter = self.set.into_iter().tuples();
96 PairedRecordSetIterator {
97 iter: Box::new(iter),
98 }
99 }
100}
101
102impl InterleavedRecordSet {}
103
104parallel_record_impl!(
105 parallel_paired_fastq,
106 parallel_paired_fastq_init,
107 R,
108 PairedFastqReader<R>,
109 RecordSetTuple,
110 (fastq::RefRecord, fastq::RefRecord),
111 fastq::Error
112);
113
114pub struct PairedFastqReader<R: std::io::Read, P = seq_io::policy::StdPolicy> {
120 reader1: fastq::Reader<R, P>,
121 reader2: fastq::Reader<R, P>,
122 rset_len: Option<usize>,
123}
124
125impl<R, P> PairedFastqReader<R, P>
126where
127 R: std::io::Read,
128 P: seq_io::policy::BufPolicy + Send,
129{
130 pub fn new(reader1: fastq::Reader<R, P>, reader2: fastq::Reader<R, P>) -> Self {
131 Self {
132 reader1,
133 reader2,
134 rset_len: None,
135 }
136 }
137}
138
139impl<R, P> seq_io::parallel::Reader for PairedFastqReader<R, P>
140where
141 R: std::io::Read,
142 P: seq_io::policy::BufPolicy + Send,
143{
144 type DataSet = RecordSetTuple;
145 type Err = fastq::Error;
146
147 fn fill_data(
148 &mut self,
149 record: &mut Self::DataSet,
150 ) -> Option<std::result::Result<(), Self::Err>> {
151 let result1 = self
154 .reader1
155 .read_record_set_exact(&mut record.first, self.rset_len);
156 if result1.is_some() {
157 self.rset_len = Some(record.first.len());
158 }
159 let result2 = self
160 .reader2
161 .read_record_set_exact(&mut record.second, self.rset_len);
162
163 match (result1, result2) {
164 (None, None) => None,
165 (Some(Ok(())), Some(Ok(()))) => {
166 if record.first.len() == record.second.len() {
167 Some(Ok(()))
168 } else {
169 let head1: String = record.first.into_iter().last().map_or_else(
170 || "No more records".to_string(),
171 |r| String::from_utf8_lossy(r.head()).to_string(),
172 );
173 let head2 = record.second.into_iter().last().map_or_else(
174 || "No more records".to_string(),
175 |r| String::from_utf8_lossy(r.head()).to_string(),
176 );
177 Some(Err(fastq::Error::Io(std::io::Error::other(format!(
178 "FASTQ files out of sync. Last records:\n\t{head1}\n\t{head2}"
179 )))))
180 }
181 }
182 (_, Some(Err(e))) | (Some(Err(e)), _) => Some(Err(e)),
183 (None, _) => Some(Err(fastq::Error::UnexpectedEnd {
184 pos: fastq::ErrorPosition {
185 line: self.reader2.position().line(),
186 id: None,
187 },
188 })),
189 (_, None) => Some(Err(fastq::Error::UnexpectedEnd {
190 pos: fastq::ErrorPosition {
191 line: self.reader1.position().line(),
192 id: None,
193 },
194 })),
195 }
196 }
197}
198
199pub struct PairedRecordSetIterator<'a> {
201 iter: Box<dyn Iterator<Item = (RefRecord<'a>, RefRecord<'a>)> + 'a>,
202}
203
204impl<'a> Iterator for PairedRecordSetIterator<'a> {
205 type Item = (RefRecord<'a>, RefRecord<'a>);
206 fn next(&mut self) -> Option<Self::Item> {
207 self.iter.next()
208 }
209}
210
211#[derive(Default, Clone, Debug, Serialize, Deserialize)]
214pub struct RecordSetTuple {
215 first: RecordSet,
216 second: RecordSet,
217}
218
219#[allow(clippy::into_iter_without_iter)]
220impl<'a> std::iter::IntoIterator for &'a RecordSetTuple {
221 type Item = (RefRecord<'a>, RefRecord<'a>);
222 type IntoIter = PairedRecordSetIterator<'a>;
223
224 #[inline]
225 fn into_iter(self) -> Self::IntoIter {
226 #[allow(clippy::useless_conversion)]
227 let iter = self.first.into_iter().zip(self.second.into_iter());
228 PairedRecordSetIterator {
229 iter: Box::new(iter),
230 }
231 }
232}