use std::{
fs::File,
io::{BufWriter, Write},
};
use tokio::sync::{
mpsc::{self, error::SendError},
oneshot,
};
use tracing::{debug, error};
use crate::common::{
aligner::InProgress,
csv::{CSVAuxData, TwitcherCSVWriter},
};
#[derive(clap::Args, Debug)]
pub struct CliOutputArgs {
#[arg(short, long, value_name = "FILE")]
output: Option<String>,
#[arg(short, long)]
all_clusters: bool,
}
pub struct ClusterResult {
pub recv: InProgress,
pub aux: CSVAuxData,
}
pub struct Writer {
sender: mpsc::Sender<ClusterResult>,
done_rx: oneshot::Receiver<()>,
}
impl Writer {
pub fn spawn(args: &CliOutputArgs) -> anyhow::Result<Writer> {
let (tx, mut rx) = mpsc::channel(1024);
let (done_tx, done_rx) = oneshot::channel();
let mut writer = InnerWriter::try_from(args)?;
tokio::spawn(async move {
while let Some(value) = rx.recv().await {
if let Err(e) = writer.write(value).await {
error!("{e}");
}
}
let _ = writer.inner.flush();
let _ = done_tx.send(());
});
Ok(Writer {
sender: tx,
done_rx,
})
}
pub async fn write(&self, result: ClusterResult) -> Result<(), SendError<ClusterResult>> {
self.sender.send(result).await
}
pub async fn wait_until_done(self) {
std::mem::drop(self.sender);
let _ = self.done_rx.await;
}
}
struct InnerWriter {
all_clusters: bool,
inner: TwitcherCSVWriter,
}
impl InnerWriter {
async fn write(&mut self, value: ClusterResult) -> anyhow::Result<()> {
let ClusterResult { mut recv, aux } = value;
let result = recv.recv().await?;
let result = match result.as_ref() {
Ok(result) => result,
Err(sf @ crate::common::aligner::result::AlignmentFailure::SoftFailure { .. }) => {
debug!("{sf}");
return Ok(());
}
Err(crate::common::aligner::result::AlignmentFailure::Error { error }) => {
return Err(anyhow::anyhow!(error.clone()));
}
};
if !result.has_ts() && !self.all_clusters {
return Ok(());
}
self.inner.write(result, aux)?;
Ok(())
}
}
impl TryFrom<&CliOutputArgs> for InnerWriter {
type Error = anyhow::Error;
fn try_from(args: &CliOutputArgs) -> Result<Self, Self::Error> {
let write = match args.output.as_deref() {
None | Some("-") => Box::new(std::io::stdout()) as Box<dyn Write + Send>,
Some(path) => Box::new(BufWriter::new(File::create(path)?)) as Box<dyn Write + Send>,
};
let writer = TwitcherCSVWriter::new(write);
Ok(InnerWriter {
inner: writer,
all_clusters: args.all_clusters,
})
}
}