kractor 4.0.0

Extract reads from a FASTQ file based on taxonomic classification via Kraken2.
use crate::extract::{process_paired_end, process_single_end};
use crate::parsers::kraken::ProcessedKrakenOutput;
use crate::{extract, parsers, Cli};
use color_eyre::eyre::{bail, ensure};
use color_eyre::Result;
use fxhash::{FxHashMap, FxHashSet};
use log::info;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct Summary {
    taxons_identified: Vec<i32>,
    reads_extracted_per_taxon: FxHashMap<i32, usize>,
    total_reads_in: usize,
    total_reads_out: usize,
    proportion_extracted: f64,
    input_format: String,
    output_format: String,
    kractor_version: String,
    missing_taxon_ids: Vec<i32>,
}

pub struct Kractor {
    args: Cli,
    taxon_ids: Vec<i32>,
    missing_taxon_ids: Vec<i32>,
    reads_to_save: FxHashSet<Vec<u8>>,
    reads_per_taxon: FxHashMap<i32, usize>,
    summary: Option<Summary>,
}

impl Kractor {
    pub fn new(args: Cli) -> Self {
        Self {
            args,
            taxon_ids: Vec::new(),
            missing_taxon_ids: Vec::new(),
            reads_to_save: FxHashSet::default(),
            reads_per_taxon: FxHashMap::default(),
            summary: None,
        }
    }

    fn validate_outputs(&self) -> Result<()> {
        for out_file in &self.args.output {
            ensure!(
                !out_file.exists(),
                "Output file already exists: {:?}",
                out_file
            );
        }
        Ok(())
    }

    fn collect_taxons(&mut self) -> Result<()> {
        let collected = extract::collect_taxons_to_save(
            &self.args.report,
            self.args.children,
            self.args.parents,
            &self.args.taxid,
            !self.args.no_report_header_detect,
        )?;
        self.taxon_ids = collected.found;
        self.missing_taxon_ids = collected.missing;
        Ok(())
    }

    fn process_kraken_output(&mut self) -> Result<()> {
        let ProcessedKrakenOutput {
            reads_to_save,
            reads_per_taxon,
        } = parsers::kraken::process_kraken_output(
            &self.args.kraken,
            self.args.exclude,
            &self.taxon_ids,
        )?;
        self.reads_to_save = reads_to_save;
        self.reads_per_taxon = reads_per_taxon;

        if self.reads_to_save.is_empty() {
            bail!("No reads found for the specified taxon ID(s). Nothing to extract.");
        }

        info!("Identified {} reads to save", self.reads_to_save.len());
        Ok(())
    }

    fn process_reads(&mut self) -> Result<()> {
        let paired = self.args.input.len() == 2;
        let input_format = if paired { "paired" } else { "single" };
        let reads_extracted_per_taxon = self.get_reads_extracted_per_taxon();

        if paired {
            let ((reads_parsed1, reads_output1), (reads_parsed2, reads_output2)) =
                process_paired_end(
                    &self.reads_to_save,
                    &self.args.input,
                    &self.args.output,
                    self.args.output_type,
                    self.args.compression_level,
                    self.args.output_fasta,
                )?;

            let reads_in = reads_parsed1 + reads_parsed2;

            let reads_out = reads_output1 + reads_output2;

            self.summary = Some(Summary {
                taxons_identified: self.taxon_ids.clone(),
                reads_extracted_per_taxon: reads_extracted_per_taxon.clone(),
                total_reads_in: reads_in,
                total_reads_out: reads_out,
                proportion_extracted: reads_out as f64 / reads_in as f64,
                input_format: input_format.to_string(),
                output_format: if self.args.output_fasta {
                    "fasta".to_string()
                } else {
                    "fastq".to_string()
                },
                kractor_version: env!("CARGO_PKG_VERSION").to_string(),
                missing_taxon_ids: self.missing_taxon_ids.clone(),
            });
        } else {
            let (reads_parsed1, reads_output1) = process_single_end(
                &self.reads_to_save,
                &self.args.input,
                &self.args.output,
                self.args.output_type,
                self.args.compression_level,
                self.args.output_fasta,
            )?;

            let reads_in = reads_parsed1;
            let reads_out = reads_output1;

            self.summary = Some(Summary {
                taxons_identified: self.taxon_ids.clone(),
                reads_extracted_per_taxon,
                missing_taxon_ids: self.missing_taxon_ids.clone(),
                total_reads_in: reads_in,
                total_reads_out: reads_out,
                proportion_extracted: reads_out as f64 / reads_in as f64,
                input_format: input_format.to_string(),
                output_format: if self.args.output_fasta {
                    "fasta".to_string()
                } else {
                    "fastq".to_string()
                },
                kractor_version: env!("CARGO_PKG_VERSION").to_string(),
            });
        }

        Ok(())
    }

    fn output_summary(&self) -> Result<()> {
        if let Some(summary) = &self.summary {
            if self.args.summary {
                let json = serde_json::to_string_pretty(summary)?;
                println!("{json}");
            }
        }
        Ok(())
    }

    fn get_reads_extracted_per_taxon(&self) -> FxHashMap<i32, usize> {
        let mut reads_extracted_per_taxon = self.reads_per_taxon.clone();
        for taxon_id in &self.taxon_ids {
            reads_extracted_per_taxon.entry(*taxon_id).or_insert(0);
        }
        reads_extracted_per_taxon
    }

    pub fn run(&mut self) -> Result<()> {
        info!(
            "Starting kractor at {}",
            chrono::Local::now().format("%H:%M:%S")
        );
        self.validate_outputs()?;
        self.collect_taxons()?;
        info!("Processing Kraken2 output file");
        self.process_kraken_output()?;
        info!("Processing reads");
        self.process_reads()?;
        info!("Complete at {}", chrono::Local::now().format("%H:%M:%S"));
        self.output_summary()?;
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use std::path::PathBuf;
    use tempfile::tempdir;

    use super::*;

    #[test]
    fn test_output_doesnt_exist() {
        let temp_dir = tempdir().unwrap();
        let output_file = temp_dir.path().join("output.fastq");
        let input_files = vec![PathBuf::from("input1.fastq"), PathBuf::from("input2.fastq")];
        let args = Cli {
            input: input_files,
            output: vec![output_file],
            kraken: PathBuf::from("kraken_output.txt"),
            report: None,
            taxid: vec![1, 2, 3],
            output_type: None,
            compression_level: niffler::Level::One,
            parents: false,
            children: false,
            exclude: false,
            output_fasta: false,
            summary: false,
            no_report_header_detect: false,
            verbose: false,
        };
        let kractor = Kractor::new(args);
        assert!(kractor.validate_outputs().is_ok());
    }

    #[test]
    fn test_output_exists() {
        let temp_dir = tempdir().unwrap();
        let output_file = temp_dir.path().join("output.fastq");
        std::fs::File::create(&output_file).unwrap();
        let input_files = vec![PathBuf::from("input.fastq")];
        let args = Cli {
            input: input_files,
            output: vec![output_file],
            kraken: PathBuf::from("kraken_output.txt"),
            report: None,
            taxid: vec![1, 2, 3],
            output_type: None,
            compression_level: niffler::Level::One,
            parents: false,
            children: false,
            exclude: false,
            output_fasta: false,
            summary: false,
            no_report_header_detect: false,
            verbose: false,
        };
        let kractor = Kractor::new(args);
        assert!(kractor.validate_outputs().is_err());
    }

    #[test]
    fn test_get_reads_extracted_per_taxon() {
        let input_files = vec![PathBuf::from("input.fastq")];
        let args = Cli {
            input: input_files,
            output: vec![PathBuf::from("output.fastq")],
            kraken: PathBuf::from("kraken_output.txt"),
            report: None,
            taxid: vec![2901879, 227984],
            output_type: None,
            compression_level: niffler::Level::One,
            parents: false,
            children: false,
            exclude: false,
            output_fasta: false,
            summary: false,
            no_report_header_detect: false,
            verbose: false,
        };
        let mut kractor = Kractor::new(args);
        kractor.taxon_ids = vec![2901879, 227984];
        kractor.reads_per_taxon.insert(227984, 257);

        let reads_extracted_per_taxon = kractor.get_reads_extracted_per_taxon();

        assert_eq!(reads_extracted_per_taxon.get(&2901879), Some(&0));
        assert_eq!(reads_extracted_per_taxon.get(&227984), Some(&257));
    }
}