use async_stream::stream;
use rust_htslib::bcf;
use tokio_stream::Stream;
use tracing::error;
use crate::{
common::{coords::GenomeRegion, list_of_regions::Regions},
counter,
vcf::pipeline::{clusterizer::cluster::ClusteringSettings, reader::VCFReader},
};
struct State {
strategy: ClusteringSettings,
record_buf: bcf::Record,
buffer: Vec<bcf::Record>,
last_rid: Option<u32>,
}
impl State {
const fn new(strategy: ClusteringSettings, record_buf: bcf::Record) -> Self {
Self {
strategy,
record_buf,
buffer: vec![],
last_rid: None,
}
}
fn consume_record(&mut self, record: bcf::Record) -> Option<ClusterOrRecords> {
let rid = record.rid();
let record_belongs_to_current_buffer = self
.strategy
.belongs(self.buffer.last(), &record)
.unwrap_or(false);
if rid != self.last_rid {
self.last_rid = rid;
let ret = self.flush();
if record_belongs_to_current_buffer {
self.buffer.push(record);
}
ret
} else if record_belongs_to_current_buffer {
self.buffer.push(record);
None
} else {
let ret = self.flush();
self.buffer.push(record);
ret
}
}
fn flush(&mut self) -> Option<ClusterOrRecords> {
if self.strategy.is_cluster(&self.buffer, |r| r, |_| None) {
let buf = std::mem::take(&mut self.buffer);
Some(ClusterOrRecords::Cluster(buf))
} else if !self.buffer.is_empty() {
let buf = std::mem::take(&mut self.buffer);
Some(ClusterOrRecords::Records(buf))
} else {
None
}
}
}
pub enum ClusterOrRecords {
Cluster(Vec<bcf::Record>),
Records(Vec<bcf::Record>),
SequenceDone,
}
impl std::fmt::Debug for ClusterOrRecords {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Records(records) => write!(f, "{} records", records.len()),
Self::Cluster(records) => write!(f, "cluster of {}", records.len()),
Self::SequenceDone => write!(f, "sequence done"),
}
}
}
pub(super) fn find_clusters(
strategy: ClusteringSettings,
mut reader: VCFReader,
targets: Option<Regions>,
) -> impl Stream<Item = ClusterOrRecords> {
let record_buf = reader.empty_record();
let mut state = State::new(strategy, record_buf);
stream! {
loop {
let read =
tokio::task::block_in_place(|| reader.read(&mut state.record_buf));
let event = match read {
Some(Ok(())) => {
let reg = GenomeRegion::try_from(&state.record_buf);
if targets
.as_ref()
.is_none_or(|t| reg.is_ok_and(|r| t.contains(&r)))
{
if state.record_buf.has_filter(&bcf::header::Id(0)) {
counter!("records.read.pass").inc(1);
state.consume_record(state.record_buf.clone())
} else {
counter!("records.read.vcf_filter").inc(1);
None }
} else {
counter!("records.read.target_filter").inc(1);
None }
}
None if state.buffer.is_empty() => {
if reader.done() {
break;
}
Some(ClusterOrRecords::SequenceDone)
}
None => state.flush(),
Some(Err(e)) => {
counter!("records.read.error").inc(1);
error!("Error reading record: {e}");
None }
};
let Some(event) = event else {
continue;
};
yield event;
}
}
}