1use anyhow::{bail, Context, Result};
2use num_format::{Locale, ToFormattedString};
3use std::{
4 collections::VecDeque,
5 fmt,
6 fs::File,
7 io::{BufRead, BufReader, Write},
8 sync::{
9 atomic::{AtomicBool, AtomicU32, Ordering},
10 Arc, Mutex,
11 },
12};
13use flate2::read::MultiGzDecoder;
14
15use crate::parse::RawSequenceRead;
16
17pub fn read_fastq(
25 fastq: String,
26 seq_clone: Arc<Mutex<VecDeque<String>>>,
27 exit_clone: Arc<AtomicBool>,
28 total_reads_arc: Arc<AtomicU32>,
29) -> Result<()> {
30
31 let mut fastq_line_reader = FastqLineReader::new(seq_clone, exit_clone);
33 let fastq_file = File::open(&fastq).context(format!("Failed to open file: {}", fastq))?; if !fastq.ends_with("fastq.gz") {
36 if !fastq.ends_with("fastq") {
38 bail!("This program only works with *.fastq files and *.fastq.gz files. The latter is still experimental")
39 }
40
41 let mut stdout = std::io::stdout();
43 let mut lock = stdout.lock();
44 for line_result in BufReader::new(fastq_file).lines() {
45 let mut line =
46 line_result.context(format!("Bufread could not read line for file: {}", fastq))?;
47 line.push('\n');
48 fastq_line_reader.read(line);
50 if fastq_line_reader.line_num == 4 {
51 fastq_line_reader.post()?;
52 }
53 if fastq_line_reader.total_reads % 10000 == 0 {
55 write!(lock, "{}", fastq_line_reader)?;
56 stdout.flush()?;
57 }
58 }
59 } else {
60 println!("If this program stops reading before the expected number of sequencing reads, unzip the gzipped fastq and rerun.");
61 println!();
62 let mut reader = BufReader::new(MultiGzDecoder::new(fastq_file));
64
65 let mut stdout = std::io::stdout();
66 let mut lock = stdout.lock();
67 let mut read_response = 10;
68 while read_response != 0 {
70 let mut line = String::new();
71 read_response = reader.read_line(&mut line)?;
72 fastq_line_reader.read(line);
74 if fastq_line_reader.line_num == 4 {
75 fastq_line_reader.post()?;
76 }
77 if fastq_line_reader.total_reads % 10000 == 0 {
79 write!(lock, "{}", fastq_line_reader)?;
80 stdout.flush()?;
81 }
82 }
83 }
84 print!("{}", fastq_line_reader);
86 total_reads_arc.store(fastq_line_reader.total_reads, Ordering::Relaxed);
87 println!();
88 Ok(())
89}
90
91struct FastqLineReader {
93 test: bool, line_num: u8, total_reads: u32, raw_sequence_read_string: String,
97 seq_clone: Arc<Mutex<VecDeque<String>>>, exit_clone: Arc<AtomicBool>, }
100
101impl FastqLineReader {
102 pub fn new(seq_clone: Arc<Mutex<VecDeque<String>>>, exit_clone: Arc<AtomicBool>) -> Self {
104 FastqLineReader {
105 test: true,
106 line_num: 0,
107 total_reads: 0,
108 raw_sequence_read_string: String::new(),
109 seq_clone,
110 exit_clone,
111 }
112 }
113
114 pub fn read(&mut self, line: String) {
116 while self.seq_clone.lock().unwrap().len() >= 10000 {
118 if self.exit_clone.load(Ordering::Relaxed) {
120 break;
121 }
122 }
123 self.line_num += 1;
125 if self.line_num == 5 {
126 self.line_num = 1
127 }
128 if self.line_num == 1 {
129 self.total_reads += 1;
130 self.raw_sequence_read_string = line;
131 } else {
132 self.raw_sequence_read_string.push_str(&line);
133 }
134 }
135
136 pub fn post(&mut self) -> Result<()> {
137 self.raw_sequence_read_string.pop(); if self.test {
140 RawSequenceRead::unpack(self.raw_sequence_read_string.clone())?.check_fastq_format()?;
141 self.test = false;
142 }
143 self.seq_clone
144 .lock()
145 .unwrap()
146 .push_front(self.raw_sequence_read_string.clone());
147 Ok(())
148 }
149}
150
151impl fmt::Display for FastqLineReader {
152 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
153 write!(
154 f,
155 "Total sequences: {}\r",
156 self.total_reads.to_formatted_string(&Locale::en)
157 )
158 }
159}