twitcher 0.5.0

Find template switch mutations in genomic data
use async_stream::stream;
use rust_htslib::bcf;
use tokio_stream::Stream;
use tracing::error;

use crate::{
    common::{coords::GenomeRegion, list_of_regions::Regions},
    counter,
    vcf::pipeline::{clusterizer::cluster::ClusteringSettings, reader::VCFReader},
};

struct State {
    strategy: ClusteringSettings,
    record_buf: bcf::Record,
    buffer: Vec<bcf::Record>,
    last_rid: Option<u32>,
}

impl State {
    const fn new(strategy: ClusteringSettings, record_buf: bcf::Record) -> Self {
        Self {
            strategy,
            record_buf,
            buffer: vec![],
            last_rid: None,
        }
    }

    fn consume_record(&mut self, record: bcf::Record) -> Option<ClusterOrRecords> {
        let rid = record.rid();
        let record_belongs_to_current_buffer = self
            .strategy
            .belongs(self.buffer.last(), &record)
            .unwrap_or(false);
        if rid != self.last_rid {
            self.last_rid = rid;
            let ret = self.flush();
            if record_belongs_to_current_buffer {
                self.buffer.push(record);
            }
            ret
        } else if record_belongs_to_current_buffer {
            self.buffer.push(record);
            None
        } else {
            let ret = self.flush();
            self.buffer.push(record);
            ret
        }
    }

    fn flush(&mut self) -> Option<ClusterOrRecords> {
        if self.strategy.is_cluster(&self.buffer, |r| r, |_| None) {
            let buf = std::mem::take(&mut self.buffer);
            Some(ClusterOrRecords::Cluster(buf))
        } else if !self.buffer.is_empty() {
            let buf = std::mem::take(&mut self.buffer);
            Some(ClusterOrRecords::Records(buf))
        } else {
            None
        }
    }
}

pub enum ClusterOrRecords {
    Cluster(Vec<bcf::Record>),
    Records(Vec<bcf::Record>),
    SequenceDone,
}

impl std::fmt::Debug for ClusterOrRecords {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Records(records) => write!(f, "{} records", records.len()),
            Self::Cluster(records) => write!(f, "cluster of {}", records.len()),
            Self::SequenceDone => write!(f, "sequence done"),
        }
    }
}

pub(super) fn find_clusters(
    strategy: ClusteringSettings,
    mut reader: VCFReader,
    targets: Option<Regions>,
) -> impl Stream<Item = ClusterOrRecords> {
    let record_buf = reader.empty_record();

    let mut state = State::new(strategy, record_buf);

    stream! {
        loop {
            let read =
                tokio::task::block_in_place(|| reader.read(&mut state.record_buf));

            let event = match read {
                Some(Ok(())) => {
                    let reg = GenomeRegion::try_from(&state.record_buf);
                    if targets
                        .as_ref()
                        .is_none_or(|t| reg.is_ok_and(|r| t.contains(&r)))
                    {
                        if state.record_buf.has_filter(&bcf::header::Id(0)) {
                            // filter PASS
                            counter!("records.read.pass").inc(1);
                            state.consume_record(state.record_buf.clone())
                        } else {
                            counter!("records.read.vcf_filter").inc(1);
                            None // Hole because filter
                        }
                    } else {
                        counter!("records.read.target_filter").inc(1);
                        None // Hole because excluded by targets
                    }
                }
                None if state.buffer.is_empty() => {
                    if reader.done() {
                        // the iterator is finished
                        break;
                    }
                    // there might be more to get, e.g. the next region.
                    Some(ClusterOrRecords::SequenceDone)
                }
                None => state.flush(),
                Some(Err(e)) => {
                    counter!("records.read.error").inc(1);
                    error!("Error reading record: {e}");
                    None // we have no entry here, but continue reading
                }
            };

            // Ignore holes
            let Some(event) = event else {
                continue;
            };

            yield event;
        }
    }
}