twitcher 0.1.9

Find template switch mutations in genomic data
use std::{
    fs::File,
    io::{BufWriter, Write},
};

use tokio::sync::{
    mpsc::{self, error::SendError},
    oneshot,
};
use tracing::{debug, error};

use crate::common::{
    aligner::InProgress,
    csv::{CSVAuxData, TwitcherCSVWriter},
};

#[derive(clap::Args, Debug)]
pub struct CliOutputArgs {
    /// Output CSV file; If not specified or "-", csv will be printed to stdout.
    #[arg(short, long, value_name = "FILE")]
    output: Option<String>,

    /// If enabled, write all clusters to the output. Otherwise (the default) only write clusters that are realigned with a template switch.
    #[arg(short, long)]
    all_clusters: bool,
}

pub struct ClusterResult {
    pub recv: InProgress,
    pub aux: CSVAuxData,
}

pub struct Writer {
    sender: mpsc::Sender<ClusterResult>,
    done_rx: oneshot::Receiver<()>,
}

impl Writer {
    pub fn spawn(args: &CliOutputArgs) -> anyhow::Result<Writer> {
        let (tx, mut rx) = mpsc::channel(1024);
        let (done_tx, done_rx) = oneshot::channel();
        let mut writer = InnerWriter::try_from(args)?;
        tokio::spawn(async move {
            while let Some(value) = rx.recv().await {
                if let Err(e) = writer.write(value).await {
                    error!("{e}");
                }
            }
            let _ = writer.inner.flush();
            // TODO this 'done' hack is not nice
            let _ = done_tx.send(());
        });
        Ok(Writer {
            sender: tx,
            done_rx,
        })
    }

    pub async fn write(&self, result: ClusterResult) -> Result<(), SendError<ClusterResult>> {
        self.sender.send(result).await
    }

    pub async fn wait_until_done(self) {
        std::mem::drop(self.sender);
        let _ = self.done_rx.await;
    }
}

struct InnerWriter {
    all_clusters: bool,
    inner: TwitcherCSVWriter,
}

impl InnerWriter {
    async fn write(&mut self, value: ClusterResult) -> anyhow::Result<()> {
        let ClusterResult { mut recv, aux } = value;
        let result = recv.recv().await?;
        let result = match result.as_ref() {
            Ok(result) => result,
            Err(sf @ crate::common::aligner::result::AlignmentFailure::SoftFailure { .. }) => {
                debug!("{sf}");
                return Ok(());
            }
            Err(crate::common::aligner::result::AlignmentFailure::Error { error }) => {
                return Err(anyhow::anyhow!(error.clone()));
            }
        };

        if !result.has_ts() && !self.all_clusters {
            return Ok(());
        }

        self.inner.write(result, aux)?;

        Ok(())
    }
}

impl TryFrom<&CliOutputArgs> for InnerWriter {
    type Error = anyhow::Error;

    fn try_from(args: &CliOutputArgs) -> Result<Self, Self::Error> {
        let write = match args.output.as_deref() {
            None | Some("-") => Box::new(std::io::stdout()) as Box<dyn Write + Send>,
            Some(path) => Box::new(BufWriter::new(File::create(path)?)) as Box<dyn Write + Send>,
        };
        let writer = TwitcherCSVWriter::new(write);
        Ok(InnerWriter {
            inner: writer,
            all_clusters: args.all_clusters,
        })
    }
}