twitcher 0.1.8

Find template switch mutations in genomic data
use std::io::IsTerminal;

use rlimit::Resource;
use serde::{Deserialize, Serialize};
use tracing::{error, info_span, instrument};

use crate::{
    VerbositySelector,
    common::{
        aligner::{
            AStarAlignerPair, AlignerSelector, AlignmentQuery,
            result::{self},
        },
        coords::GenomeRegion,
    },
};

// Cow but without a `ToOwned` bound
pub enum RefOrOwned<'a, T: 'a> {
    Ref(&'a T),
    Owned(T),
}

impl<T> std::ops::Deref for RefOrOwned<'_, T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        match self {
            RefOrOwned::Ref(r) => r,
            RefOrOwned::Owned(o) => o,
        }
    }
}

impl<T: Serialize> Serialize for RefOrOwned<'_, T> {
    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
        match self {
            RefOrOwned::Ref(r) => r.serialize(serializer),
            RefOrOwned::Owned(o) => o.serialize(serializer),
        }
    }
}

impl<'de, T: Deserialize<'de>> Deserialize<'de> for RefOrOwned<'_, T> {
    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
        T::deserialize(deserializer).map(RefOrOwned::Owned)
    }
}

impl<T> From<T> for RefOrOwned<'_, T> {
    fn from(value: T) -> Self {
        Self::Owned(value)
    }
}

impl<'a, T> From<&'a T> for RefOrOwned<'a, T> {
    fn from(value: &'a T) -> Self {
        Self::Ref(value)
    }
}

#[derive(Serialize, Deserialize)]
pub struct WorkerQuery<'a> {
    pub aligner: RefOrOwned<'a, AlignerSelector>,
    pub log_level: VerbositySelector,
    pub memory: usize,
    pub query: AlignmentQuery,
    pub metadata: WorkerQueryMetadata,
}

#[derive(Serialize, Deserialize)]
pub struct WorkerQueryMetadata {
    pub cluster_region: GenomeRegion,
}

pub fn run() {
    match run_fallible() {
        Ok(()) => (),
        Err(e) => error!("Error in worker: {e}"),
    }
}

/// Read the query from stdin and write the result to stdout.
/// Errors are reported either to stderr, through non-zero exit or abort.
#[instrument]
pub fn run_fallible() -> anyhow::Result<()> {
    let wq = read_from_stdin()?;

    crate::setup_tracing(wq.log_level);
    let span = info_span!("Aligning sequences", pos = %wq.metadata.cluster_region).entered();

    let bytes = wq.memory as u64;
    rlimit::setrlimit(Resource::AS, bytes, bytes)
        .inspect_err(|e| error!("Can't set resource limit: {e}"))?;

    let result = match &*wq.aligner {
        AlignerSelector::AStar(AStarAlignerPair { ts, no_ts }) => {
            let with_ts = ts.align(
                "reference",
                &wq.query.sequences.reference,
                "query",
                &wq.query.sequences.query,
                Some(wq.query.ranges.clone()),
                None,
                Some(wq.memory),
            );
            let without_ts = if with_ts.statistics().template_switch_amount.const_raw() > 0.0 {
                Some(no_ts.align(
                    "reference",
                    &wq.query.sequences.reference,
                    "query",
                    &wq.query.sequences.query,
                    Some(wq.query.ranges.clone()),
                    None,
                    Some(wq.memory),
                ))
            } else {
                None
            };
            result::from_tsalign(with_ts, without_ts)
        }
        AlignerSelector::Fpa(four_point_aligner) => four_point_aligner.align(
            wq.query.sequences.reference,
            wq.query.sequences.query,
            wq.query.ranges,
        ),
    };
    span.exit();

    let _span = info_span!("Reporting alignment", pos = %wq.metadata.cluster_region).entered();
    let mut out = std::io::stdout().lock();
    rmp_serde::encode::write(&mut out, &result)?;
    std::mem::drop(out);

    Ok(())
}

fn read_from_stdin() -> anyhow::Result<WorkerQuery<'static>> {
    let mut stdin = std::io::stdin().lock();
    if stdin.is_terminal() {
        eprintln!(
            "The `worker` subcommand is not designed to be called directly but only as a subroutine of twitcher. You will most likely not get the expected functionality."
        );
    }

    let wq = rmp_serde::from_read(&mut stdin)?;
    Ok(wq)
}