use std::sync::Arc;
use bstr::ByteSlice;
use rust_htslib::bam::{self, Read};
use tokio_stream::StreamExt;
use tracing::{Span, info, instrument, warn};
use tracing_indicatif::span_ext::IndicatifSpanExt;
use url::Url;
use crate::{
RunnableCommand,
common::{
cluster_settings::ClusteringSettings,
list_of_regions::{CliRegionsArgs, RegionsDefinition},
reference::{CliReferenceArg, ReferenceReader},
},
counter,
reads::{
bam_reader::BAMReader,
clusters::find_clusters,
processor::{CliProcessorArgs, Processor},
},
};
mod bam_reader;
mod clusters;
mod processor;
mod writer;
#[derive(clap::Args, Debug)]
pub struct Command {
bam_file: String,
#[arg(short = 'n', long = "number-of-reads")]
number_of_reads: Option<usize>,
#[command(flatten)]
ref_file: CliReferenceArg,
#[command(flatten)]
regions: CliRegionsArgs,
#[command(flatten)]
clustering: ClusteringSettings,
#[command(flatten)]
processor: CliProcessorArgs,
}
impl RunnableCommand for Command {
#[instrument(name = "reads", skip_all, fields(indicatif.pb_show = true))]
async fn run(self) -> anyhow::Result<()> {
let mut reader = self.open_bam_file().await?;
reader.set_reference(&self.ref_file.file)?; let reference_reader = ReferenceReader::try_from(&self.ref_file)?;
let processor = Processor::new(&self.processor).await?;
let mut rec_buf = Arc::new(bam::Record::new());
let skipped = counter!("reads.skipped");
let num_targets = reader.header().target_count();
let span = Span::current();
span.pb_start();
span.pb_set_length(num_targets.into());
let mut read_counter = 0;
loop {
if let Some(max) = self.number_of_reads {
if read_counter >= max {
info!("Maximum number of reads {max} reached, stopping.");
break;
}
read_counter += 1;
}
match tokio::task::block_in_place(|| reader.read(Arc::make_mut(&mut rec_buf))) {
Some(result) => result?,
None if reader.done() => break,
None => continue,
}
if let Ok(tid) = u32::try_from(rec_buf.tid()) {
span.pb_set_position(tid.into());
}
counter!("reads.total").inc(1);
let seq_len = rec_buf.seq_len();
if seq_len == 0 {
if skipped.get() == 0 {
warn!(
"Skipping read {} with empty sequence.",
rec_buf.qname().as_bstr()
);
}
skipped.inc(1);
continue;
}
if rec_buf.tid() < 0 {
skipped.inc(1);
continue;
}
let mut clusters =
match find_clusters(reader.header(), rec_buf.clone(), &reference_reader, self.clustering.clone()).await {
Ok(clusters) => clusters,
Err(e) => {
warn!(
"Can't find clusters on read {}: {e}",
rec_buf.qname().as_bstr()
);
continue;
}
};
while let Some(cluster) = clusters.next().await {
counter!("clusters").inc(1);
processor
.process_cluster(cluster, &rec_buf, &reference_reader)
.await?;
}
}
processor.wait_until_done().await;
if skipped.get() > 0 {
warn!(
"In total, {skipped} reads were skipped because they are either missing sequence or alignment information."
);
}
Ok(())
}
}
impl Command {
async fn open_bam_file(&self) -> Result<bam_reader::BAMReader, anyhow::Error> {
if let Some(regions) = self.regions.read_regions().await? {
let inner = match self.bam_file.as_str() {
"-" => anyhow::bail!("Cannot create indexed reader from stdin"),
path_or_url => tokio::task::block_in_place(|| {
if let Ok(url) = Url::parse(path_or_url) {
bam::IndexedReader::from_url(&url)
} else {
bam::IndexedReader::from_path(path_or_url)
}
}),
}?;
let regions = regions.into_linear(|a| inner.header().tid(a.as_ref()).unwrap() as usize);
Ok(BAMReader::with_regions(inner, regions))
} else {
let inner = match self.bam_file.as_str() {
"-" => bam::Reader::from_stdin(),
path_or_url => tokio::task::block_in_place(|| {
if let Ok(url) = Url::parse(path_or_url) {
bam::Reader::from_url(&url)
} else {
bam::Reader::from_path(path_or_url)
}
}),
}?;
Ok(BAMReader::entire_file(inner))
}
}
}