twitcher 0.1.9

Find template switch mutations in genomic data
use crate::{
    common::{csv::TwitcherCSVWriter, list_of_regions::Regions, reference::ReferenceReader},
    vcf::pipeline::{clusterizer::ClusterSettings, message::Message},
};
use std::fmt::Debug;

use clusterizer::Clusterizer;
use tokio::sync::mpsc;
use writer::Writer;

use crate::vcf::pipeline::{
    reader::VCFReader,
    writer::{OutputWriter, region_writer::RegionWriter},
};

pub mod clusterizer;
mod message;
pub mod reader;
pub mod writer;

#[derive(clap::Args, Debug)]
pub struct PipelineSettings {
    #[command(flatten)]
    pub cluster: clusterizer::ClusterSettings,

    #[arg(long = "buffer", default_value_t = 8192)]
    pub buffer_size: usize,
}

impl Default for PipelineSettings {
    fn default() -> Self {
        Self {
            cluster: ClusterSettings::default(),
            buffer_size: 8192,
        }
    }
}

pub struct VCFPipeline {
    pub input: VCFReader,
    pub output: OutputWriter,
    pub reference: ReferenceReader,
    pub targets: Option<Regions>,
    pub region_output: Option<RegionWriter>,
    pub csv_output: Option<TwitcherCSVWriter>,
    pub settings: PipelineSettings,
    pub bam_f: Option<rust_htslib::bam::IndexedReader>,
}

impl VCFPipeline {
    pub async fn run(self) -> anyhow::Result<()> {
        // TODO do some pre-checks on the input
        // 1. Is there more than one sample?
        // 2. Is the GT generally diploid?

        let aligner_settings = self.settings.cluster;

        let (tx, rx) = mpsc::channel(self.settings.buffer_size);
        tokio::spawn(async move {
            let c = Clusterizer::new(
                self.input,
                self.reference,
                self.targets,
                self.bam_f,
                tx,
                aligner_settings,
            );
            c.run().await;
        });

        // We run the writer on the main thread, as when it completes we can exit.
        let w = Writer::new(rx, self.output, self.region_output, self.csv_output);
        w.run().await?;

        Ok(())
    }
}