barcode_count/
input.rs

1use anyhow::{bail, Context, Result};
2use num_format::{Locale, ToFormattedString};
3use std::{
4    collections::VecDeque,
5    fmt,
6    fs::File,
7    io::{BufRead, BufReader, Write},
8    sync::{
9        atomic::{AtomicBool, AtomicU32, Ordering},
10        Arc, Mutex,
11    },
12};
13use flate2::read::MultiGzDecoder;
14
15use crate::parse::RawSequenceRead;
16
17/// Reads in the FASTQ file line by line, then pushes every 2 out of 4 lines, which corresponds to the sequence line, into a Vec that is passed to other threads
18///
19/// FASTQ format:
20/// Line 1: Sequence ID
21/// Line 2: DNA sequence
22/// Line 3: +
23/// Line 4: Quality score
24pub fn read_fastq(
25    fastq: String,
26    seq_clone: Arc<Mutex<VecDeque<String>>>,
27    exit_clone: Arc<AtomicBool>,
28    total_reads_arc: Arc<AtomicU32>,
29) -> Result<()> {
30
31    // Create a fastq line reader which keeps track of line number, reads, and posts the sequence to the shared vector
32    let mut fastq_line_reader = FastqLineReader::new(seq_clone, exit_clone);
33    let fastq_file = File::open(&fastq).context(format!("Failed to open file: {}", fastq))?; // open file
34    // If the file is not gzipped use BufReader to read in lines
35    if !fastq.ends_with("fastq.gz") {
36        // If the file does not end with fastq, return with an error
37        if !fastq.ends_with("fastq") {
38            bail!("This program only works with *.fastq files and *.fastq.gz files.  The latter is still experimental")
39        }
40
41        // go line by line
42        let mut stdout = std::io::stdout();
43        let mut lock = stdout.lock();
44        for line_result in BufReader::new(fastq_file).lines() {
45            let mut line =
46                line_result.context(format!("Bufread could not read line for file: {}", fastq))?;
47            line.push('\n');
48            // post the line to the shared vector and keep track of the number of sequences etc
49            fastq_line_reader.read(line);
50            if fastq_line_reader.line_num == 4 {
51                fastq_line_reader.post()?;
52            }
53            // Add to read count to print numnber of sequences read by this thread
54            if fastq_line_reader.total_reads % 10000 == 0 {
55                write!(lock, "{}", fastq_line_reader)?;
56                stdout.flush()?;
57            }
58        }
59    } else {
60        println!("If this program stops reading before the expected number of sequencing reads, unzip the gzipped fastq and rerun.");
61        println!();
62        // stream in first by decoding with GzDecoder, the reading into buffer
63        let mut reader = BufReader::new(MultiGzDecoder::new(fastq_file));
64
65        let mut stdout = std::io::stdout();
66        let mut lock = stdout.lock();
67        let mut read_response = 10;
68        // continue reading until there is a response of 0, which indicates the end of file.  This may be where some gzipped files abrupty end
69        while read_response != 0 {
70            let mut line = String::new();
71            read_response = reader.read_line(&mut line)?;
72            // post the line to the shared vector and keep track of the number of sequences etc
73            fastq_line_reader.read(line);
74            if fastq_line_reader.line_num == 4 {
75                fastq_line_reader.post()?;
76            }
77            // Add to read count to print numnber of sequences read by this thread
78            if fastq_line_reader.total_reads % 10000 == 0 {
79                write!(lock, "{}", fastq_line_reader)?;
80                stdout.flush()?;
81            }
82        }
83    }
84    // Display the final total read count
85    print!("{}", fastq_line_reader);
86    total_reads_arc.store(fastq_line_reader.total_reads, Ordering::Relaxed);
87    println!();
88    Ok(())
89}
90
91/// A struct with functions for keeping track of read information and to post sequence lines to the shared vector
92struct FastqLineReader {
93    test: bool,   // whether or not to test the fastq format. Only does this for the first read
94    line_num: u8, // the current line number 1-4.  Resets back to 1
95    total_reads: u32, // total sequences read within the fastq file
96    raw_sequence_read_string: String,
97    seq_clone: Arc<Mutex<VecDeque<String>>>, // the vector that is passed between threads which containst the sequences
98    exit_clone: Arc<AtomicBool>, // a bool which is set to true when one of the other threads panic.  This is the prevent hanging and is used to exit this thread
99}
100
101impl FastqLineReader {
102    /// Creates a new FastqLineReader struct
103    pub fn new(seq_clone: Arc<Mutex<VecDeque<String>>>, exit_clone: Arc<AtomicBool>) -> Self {
104        FastqLineReader {
105            test: true,
106            line_num: 0,
107            total_reads: 0,
108            raw_sequence_read_string: String::new(),
109            seq_clone,
110            exit_clone,
111        }
112    }
113
114    /// Reads in the line and either passes to the vec or discards it, depending if it is a sequence line.  Also increments on line count, sequence count etc.
115    pub fn read(&mut self, line: String) {
116        // Pause if there are already 10000 sequences in the vec so memory is not overloaded
117        while self.seq_clone.lock().unwrap().len() >= 10000 {
118            // if threads have failed exit out of this thread
119            if self.exit_clone.load(Ordering::Relaxed) {
120                break;
121            }
122        }
123        // increase line number and if it has passed line 4, reset to 1
124        self.line_num += 1;
125        if self.line_num == 5 {
126            self.line_num = 1
127        }
128        if self.line_num == 1 {
129            self.total_reads += 1;
130            self.raw_sequence_read_string = line;
131        } else {
132            self.raw_sequence_read_string.push_str(&line);
133        }
134    }
135
136    pub fn post(&mut self) -> Result<()> {
137        self.raw_sequence_read_string.pop(); // removes the last \n
138                                             // Insert the sequence into the vec.  This will be popped out by other threads
139        if self.test {
140            RawSequenceRead::unpack(self.raw_sequence_read_string.clone())?.check_fastq_format()?;
141            self.test = false;
142        }
143        self.seq_clone
144            .lock()
145            .unwrap()
146            .push_front(self.raw_sequence_read_string.clone());
147        Ok(())
148    }
149}
150
151impl fmt::Display for FastqLineReader {
152    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
153        write!(
154            f,
155            "Total sequences:             {}\r",
156            self.total_reads.to_formatted_string(&Locale::en)
157        )
158    }
159}