use std::{
fs::File,
io::{BufWriter, Write},
};
use csv::Writer as CSVWriter;
use generic_a_star::cost::AStarCost;
use serde::Serialize;
use tokio::sync::{
mpsc::{self, error::SendError},
oneshot,
};
use tracing::{debug, error};
use crate::common::{
aligner::{
InProgress,
result::{TSData, TwitcherAlignmentCase},
},
coords::GenomeRegion,
};
#[derive(clap::Args, Debug)]
pub struct CliOutputArgs {
#[arg(short, long, value_name = "FILE")]
output: Option<String>,
#[arg(short, long)]
all_clusters: bool,
}
pub struct ClusterResult {
pub recv: InProgress,
pub region: GenomeRegion,
pub rec_qname: String,
pub rec_pos: usize,
pub rec_seq_len: usize,
}
pub struct Writer {
sender: mpsc::Sender<ClusterResult>,
done_rx: oneshot::Receiver<()>,
}
#[derive(Serialize)]
struct OutputRecord {
region: String,
read_id: String,
read_start: usize,
read_end: usize,
cigar_ext: String,
alignment_cost: u64,
num_ts: usize,
ts_1_2: String,
ts_2_3: String,
ts_1_4: String,
start_left_shift: String,
start_right_shift: String,
end_left_shift: String,
end_right_shift: String,
inner_alignment_cigar: String,
alignment_cost_without_ts: u64,
}
impl Writer {
pub fn spawn(args: &CliOutputArgs) -> anyhow::Result<Writer> {
let (tx, mut rx) = mpsc::channel(1024);
let (done_tx, done_rx) = oneshot::channel();
let mut writer = InnerWriter::try_from(args)?;
tokio::spawn(async move {
while let Some(value) = rx.recv().await {
if let Err(e) = writer.write(value).await {
error!("{e}");
}
}
let _ = writer.inner.flush();
let _ = done_tx.send(());
});
Ok(Writer {
sender: tx,
done_rx,
})
}
pub async fn write(&self, result: ClusterResult) -> Result<(), SendError<ClusterResult>> {
self.sender.send(result).await
}
pub async fn wait_until_done(self) {
std::mem::drop(self.sender);
let _ = self.done_rx.await;
}
}
struct InnerWriter {
all_clusters: bool,
inner: CSVWriter<Box<dyn Write + Send>>,
}
impl InnerWriter {
async fn write(&mut self, value: ClusterResult) -> anyhow::Result<()> {
let ClusterResult {
mut recv,
region,
rec_qname,
rec_pos,
rec_seq_len,
} = value;
let result = recv.recv().await?;
let result = match result.as_ref() {
Ok(result) => result,
Err(sf @ crate::common::aligner::result::AlignmentFailure::SoftFailure { .. }) => {
debug!("{sf}");
return Ok(());
}
Err(crate::common::aligner::result::AlignmentFailure::Error { error }) => {
return Err(anyhow::anyhow!(error.clone()));
}
};
if !result.has_ts() && !self.all_clusters {
return Ok(());
}
let (cigar_ext, alignment_cost, alignment_cost_without_ts) = match &result.result {
TwitcherAlignmentCase::FoundTS {
alignment_with_ts,
cost_with_ts,
cost_without_ts,
} => (
alignment_with_ts.cigar(),
cost_with_ts.as_u64(),
cost_without_ts.map_or(0, |c| c.as_u64()),
),
TwitcherAlignmentCase::NoTS {
cost_without_ts, ..
} => (String::new(), 0, cost_without_ts.as_u64()),
};
let tss = TSData::compute(result);
let out = OutputRecord {
region: region.to_string(),
read_id: rec_qname,
read_start: rec_pos,
read_end: rec_pos + rec_seq_len,
cigar_ext,
alignment_cost,
num_ts: tss.len(),
ts_1_2: TSData::to_field(&tss, "|", |d| d.jump_1_2),
ts_2_3: TSData::to_field(&tss, "|", |d| d.inner_len),
ts_1_4: TSData::to_field(&tss, "|", |d| d.apg),
start_left_shift: TSData::to_field(&tss, "|", |d| -d.er.min_start),
start_right_shift: TSData::to_field(&tss, "|", |d| d.er.max_start),
end_left_shift: TSData::to_field(&tss, "|", |d| -d.er.min_end),
end_right_shift: TSData::to_field(&tss, "|", |d| d.er.max_end),
inner_alignment_cigar: TSData::to_field(&tss, "|", |d| d.inner_aln.cigar()),
alignment_cost_without_ts,
};
tokio::task::block_in_place(|| self.inner.serialize(out))?;
Ok(())
}
}
impl TryFrom<&CliOutputArgs> for InnerWriter {
type Error = anyhow::Error;
fn try_from(args: &CliOutputArgs) -> Result<Self, Self::Error> {
let write = match args.output.as_deref() {
None | Some("-") => Box::new(std::io::stdout()) as Box<dyn Write + Send>,
Some(path) => Box::new(BufWriter::new(File::create(path)?)) as Box<dyn Write + Send>,
};
let writer = CSVWriter::from_writer(write);
Ok(InnerWriter {
inner: writer,
all_clusters: args.all_clusters,
})
}
}