twitcher 0.1.8

Find template switch mutations in genomic data
use std::{
    fs::File,
    io::{BufWriter, Write},
};

use csv::Writer as CSVWriter;
use generic_a_star::cost::AStarCost;
use serde::Serialize;
use tokio::sync::{
    mpsc::{self, error::SendError},
    oneshot,
};
use tracing::{debug, error};

use crate::common::{
    aligner::{
        InProgress,
        result::{TSData, TwitcherAlignmentCase},
    },
    coords::GenomeRegion,
};

#[derive(clap::Args, Debug)]
pub struct CliOutputArgs {
    /// Output CSV file; If not specified or "-", csv will be printed to stdout.
    #[arg(short, long, value_name = "FILE")]
    output: Option<String>,

    /// If enabled, write all clusters to the output. Otherwise (the default) only write clusters that are realigned with a template switch.
    #[arg(short, long)]
    all_clusters: bool,
}

pub struct ClusterResult {
    pub recv: InProgress,
    pub region: GenomeRegion,
    pub rec_qname: String,
    pub rec_pos: usize,
    pub rec_seq_len: usize,
}

pub struct Writer {
    sender: mpsc::Sender<ClusterResult>,
    done_rx: oneshot::Receiver<()>,
}

#[derive(Serialize)]
struct OutputRecord {
    region: String,
    read_id: String,
    read_start: usize,
    read_end: usize,
    // index_in_read: usize,
    cigar_ext: String,
    alignment_cost: u64,
    num_ts: usize,
    ts_1_2: String,
    ts_2_3: String,
    ts_1_4: String,
    start_left_shift: String,
    start_right_shift: String,
    end_left_shift: String,
    end_right_shift: String,
    inner_alignment_cigar: String,
    alignment_cost_without_ts: u64,
}

impl Writer {
    pub fn spawn(args: &CliOutputArgs) -> anyhow::Result<Writer> {
        let (tx, mut rx) = mpsc::channel(1024);
        let (done_tx, done_rx) = oneshot::channel();
        let mut writer = InnerWriter::try_from(args)?;
        tokio::spawn(async move {
            while let Some(value) = rx.recv().await {
                if let Err(e) = writer.write(value).await {
                    error!("{e}");
                }
            }
            let _ = writer.inner.flush();
            // TODO this 'done' hack is not nice
            let _ = done_tx.send(());
        });
        Ok(Writer {
            sender: tx,
            done_rx,
        })
    }

    pub async fn write(&self, result: ClusterResult) -> Result<(), SendError<ClusterResult>> {
        self.sender.send(result).await
    }

    pub async fn wait_until_done(self) {
        std::mem::drop(self.sender);
        let _ = self.done_rx.await;
    }
}

struct InnerWriter {
    all_clusters: bool,
    inner: CSVWriter<Box<dyn Write + Send>>,
}

impl InnerWriter {
    async fn write(&mut self, value: ClusterResult) -> anyhow::Result<()> {
        let ClusterResult {
            mut recv,
            region,
            rec_qname,
            rec_pos,
            rec_seq_len,
        } = value;
        let result = recv.recv().await?;
        let result = match result.as_ref() {
            Ok(result) => result,
            Err(sf @ crate::common::aligner::result::AlignmentFailure::SoftFailure { .. }) => {
                debug!("{sf}");
                return Ok(());
            }
            Err(crate::common::aligner::result::AlignmentFailure::Error { error }) => {
                return Err(anyhow::anyhow!(error.clone()));
            }
        };

        if !result.has_ts() && !self.all_clusters {
            return Ok(());
        }

        let (cigar_ext, alignment_cost, alignment_cost_without_ts) = match &result.result {
            TwitcherAlignmentCase::FoundTS {
                alignment_with_ts,
                cost_with_ts,
                cost_without_ts,
            } => (
                alignment_with_ts.cigar(),
                cost_with_ts.as_u64(),
                cost_without_ts.map_or(0, |c| c.as_u64()),
            ),
            TwitcherAlignmentCase::NoTS {
                cost_without_ts, ..
            } => (String::new(), 0, cost_without_ts.as_u64()),
        };

        let tss = TSData::compute(result);
        let out = OutputRecord {
            region: region.to_string(),
            read_id: rec_qname,
            read_start: rec_pos,
            read_end: rec_pos + rec_seq_len,
            // index_in_read: result.with_ts.statistics().query_offset,
            cigar_ext,
            alignment_cost,
            num_ts: tss.len(),
            ts_1_2: TSData::to_field(&tss, "|", |d| d.jump_1_2),
            ts_2_3: TSData::to_field(&tss, "|", |d| d.inner_len),
            ts_1_4: TSData::to_field(&tss, "|", |d| d.apg),
            start_left_shift: TSData::to_field(&tss, "|", |d| -d.er.min_start),
            start_right_shift: TSData::to_field(&tss, "|", |d| d.er.max_start),
            end_left_shift: TSData::to_field(&tss, "|", |d| -d.er.min_end),
            end_right_shift: TSData::to_field(&tss, "|", |d| d.er.max_end),
            inner_alignment_cigar: TSData::to_field(&tss, "|", |d| d.inner_aln.cigar()),
            alignment_cost_without_ts,
        };
        tokio::task::block_in_place(|| self.inner.serialize(out))?;
        Ok(())
    }
}

impl TryFrom<&CliOutputArgs> for InnerWriter {
    type Error = anyhow::Error;

    fn try_from(args: &CliOutputArgs) -> Result<Self, Self::Error> {
        let write = match args.output.as_deref() {
            None | Some("-") => Box::new(std::io::stdout()) as Box<dyn Write + Send>,
            Some(path) => Box::new(BufWriter::new(File::create(path)?)) as Box<dyn Write + Send>,
        };
        let writer = CSVWriter::from_writer(write);
        Ok(InnerWriter {
            inner: writer,
            all_clusters: args.all_clusters,
        })
    }
}