1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
use custom_error::custom_error;
use flate2::read::GzDecoder;
use num_format::{Locale, ToFormattedString};
use std::{
    collections::VecDeque,
    error::Error,
    fmt,
    fs::File,
    io::{BufRead, BufReader, Write},
    sync::{
        atomic::{AtomicBool, AtomicU32, Ordering},
        Arc, Mutex,
    },
};

// Errors associated with checking the fastq format to make sure it is correct
custom_error! {FastqError
    NotFastq = "This program only works with *.fastq files and *.fastq.gz files.  The latter is still experimental",
    LeftOver = "Sequence lines left over.  Check code",
}

/// Reads in the FASTQ file line by line, then pushes every 2 out of 4 lines, which corresponds to the sequence line, into a Vec that is passed to other threads
///
/// FASTQ format:
/// Line 1: Sequence ID
/// Line 2: DNA sequence
/// Line 3: +
/// Line 4: Quality score
pub fn read_fastq(
    fastq: String,
    seq_clone: Arc<Mutex<VecDeque<String>>>,
    exit_clone: Arc<AtomicBool>,
    total_reads_arc: Arc<AtomicU32>,
) -> Result<(), Box<dyn Error>> {
    let fastq_file = File::open(fastq.clone())?; // open file

    // Create a fastq line reader which keeps track of line number, reads, and posts the sequence to the shared vector
    let mut fastq_line_reader = FastqLineReader::new(seq_clone, exit_clone);

    // If the file is not gzipped use BufReader to read in lines
    if !fastq.ends_with("fastq.gz") {
        // If the file does not end with fastq, return with an error
        if !fastq.ends_with("fastq") {
            return Err(Box::new(FastqError::NotFastq));
        }

        // go line by line
        for line_result in BufReader::new(fastq_file).lines() {
            let line = line_result?;
            // post the line to the shared vector and keep track of the number of sequences etc
            fastq_line_reader.read(line)?;
            if fastq_line_reader.line_num == 4 {
                fastq_line_reader.post()?;
            }
            // Add to read count to print numnber of sequences read by this thread
            if fastq_line_reader.total_reads % 10000 == 0 {
                print!("{}", fastq_line_reader);
                std::io::stdout().flush()?;
            }
        }
    } else {
        println!("If this program stops reading before the expected number of sequencing reads, unzip the gzipped fastq and rerun.");
        println!();
        // stream in first by decoding with GzDecoder, the reading into buffer
        let mut reader = BufReader::new(GzDecoder::new(fastq_file));

        // artificially set the read response to 10.  The first number does not matter
        let mut read_response = 10;
        // continue reading until there is a response of 0, which indicates the end of file.  This may be where some gzipped files abrupty end
        while read_response != 0 {
            let mut line = String::new();
            // move the read line to the line variable and get the response to check if it is 0 and therefore the file is done
            read_response = reader.read_line(&mut line)?;
            // post the line to the shared vector and keep track of the number of sequences etc
            fastq_line_reader.read(line.replace('\n', ""))?;
            if fastq_line_reader.line_num == 4 {
                fastq_line_reader.post()?;
            }
            // Add to read count to print numnber of sequences read by this thread
            if fastq_line_reader.total_reads % 10000 == 0 {
                print!("{}", fastq_line_reader);
                std::io::stdout().flush()?;
            }
        }
    }
    // Display the final total read count
    print!("{}", fastq_line_reader);
    total_reads_arc.store(fastq_line_reader.total_reads, Ordering::Relaxed);
    println!();
    Ok(())
}

/// A struct with functions for keeping track of read information and to post sequence lines to the shared vector
struct FastqLineReader {
    test: bool,   // whether or not to test the fastq format. Only does this for the first read
    line_num: u8, // the current line number 1-4.  Resets back to 1
    total_reads: u32, // total sequences read within the fastq file
    raw_sequence_read_string: String,
    seq_clone: Arc<Mutex<VecDeque<String>>>, // the vector that is passed between threads which containst the sequences
    exit_clone: Arc<AtomicBool>, // a bool which is set to true when one of the other threads panic.  This is the prevent hanging and is used to exit this thread
}

impl FastqLineReader {
    /// Creates a new FastqLineReader struct
    pub fn new(seq_clone: Arc<Mutex<VecDeque<String>>>, exit_clone: Arc<AtomicBool>) -> Self {
        FastqLineReader {
            test: true,
            line_num: 0,
            total_reads: 0,
            raw_sequence_read_string: String::new(),
            seq_clone,
            exit_clone,
        }
    }

    /// Reads in the line and either passes to the vec or discards it, depending if it is a sequence line.  Also increments on line count, sequence count etc.
    pub fn read(&mut self, line: String) -> Result<(), Box<dyn Error>> {
        // Pause if there are already 10000 sequences in the vec so memory is not overloaded
        while self.seq_clone.lock().unwrap().len() >= 10000 {
            // if threads have failed exit out of this thread
            if self.exit_clone.load(Ordering::Relaxed) {
                break;
            }
        }
        // increase line number and if it has passed line 4, reset to 1
        self.line_num += 1;
        if self.line_num == 5 {
            self.line_num = 1
        }
        if self.line_num == 1 {
            self.total_reads += 1;
            self.raw_sequence_read_string = line;
        } else {
            self.raw_sequence_read_string.push('\n');
            self.raw_sequence_read_string.push_str(&line);
        }
        Ok(())
    }

    pub fn post(&mut self) -> Result<(), Box<dyn Error>> {
        // Insert the sequence into the vec.  This will be popped out by other threads
        if self.test {
            crate::parse::RawSequenceRead::unpack(self.raw_sequence_read_string.clone())?
                .check_fastq_format()?;
            self.test = false;
        }
        self.seq_clone
            .lock()
            .unwrap()
            .push_front(self.raw_sequence_read_string.clone());
        Ok(())
    }
}

impl fmt::Display for FastqLineReader {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(
            f,
            "Total sequences:             {}\r",
            self.total_reads.to_formatted_string(&Locale::en)
        )
    }
}