use std::io::IsTerminal;
use rlimit::Resource;
use serde::{Deserialize, Serialize};
use tracing::{error, info_span, instrument};
use crate::{
VerbositySelector,
common::{
aligner::{
AStarAlignerPair, AlignerSelector, AlignmentQuery,
result::{self},
},
coords::GenomeRegion,
},
};
pub enum RefOrOwned<'a, T: 'a> {
Ref(&'a T),
Owned(T),
}
impl<T> std::ops::Deref for RefOrOwned<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
match self {
RefOrOwned::Ref(r) => r,
RefOrOwned::Owned(o) => o,
}
}
}
impl<T: Serialize> Serialize for RefOrOwned<'_, T> {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
match self {
RefOrOwned::Ref(r) => r.serialize(serializer),
RefOrOwned::Owned(o) => o.serialize(serializer),
}
}
}
impl<'de, T: Deserialize<'de>> Deserialize<'de> for RefOrOwned<'_, T> {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
T::deserialize(deserializer).map(RefOrOwned::Owned)
}
}
impl<T> From<T> for RefOrOwned<'_, T> {
fn from(value: T) -> Self {
Self::Owned(value)
}
}
impl<'a, T> From<&'a T> for RefOrOwned<'a, T> {
fn from(value: &'a T) -> Self {
Self::Ref(value)
}
}
#[derive(Serialize, Deserialize)]
pub struct WorkerQuery<'a> {
pub aligner: RefOrOwned<'a, AlignerSelector>,
pub log_level: VerbositySelector,
pub memory: usize,
pub query: AlignmentQuery,
pub metadata: WorkerQueryMetadata,
}
#[derive(Serialize, Deserialize)]
pub struct WorkerQueryMetadata {
pub cluster_region: GenomeRegion,
}
pub fn run() {
match run_fallible() {
Ok(()) => (),
Err(e) => error!("Error in worker: {e}"),
}
}
#[instrument]
pub fn run_fallible() -> anyhow::Result<()> {
let wq = read_from_stdin()?;
crate::setup_tracing(wq.log_level);
let span = info_span!("Aligning sequences", pos = %wq.metadata.cluster_region).entered();
let bytes = wq.memory as u64;
rlimit::setrlimit(Resource::AS, bytes, bytes)
.inspect_err(|e| error!("Can't set resource limit: {e}"))?;
let result = match &*wq.aligner {
AlignerSelector::AStar(AStarAlignerPair { ts, no_ts }) => {
let with_ts = ts.align(
"reference",
&wq.query.sequences.reference,
"query",
&wq.query.sequences.query,
Some(wq.query.ranges.clone()),
None,
Some(wq.memory),
);
let without_ts = if with_ts.statistics().template_switch_amount.const_raw() > 0.0 {
Some(no_ts.align(
"reference",
&wq.query.sequences.reference,
"query",
&wq.query.sequences.query,
Some(wq.query.ranges.clone()),
None,
Some(wq.memory),
))
} else {
None
};
result::from_tsalign(with_ts, without_ts)
}
AlignerSelector::Fpa(four_point_aligner) => four_point_aligner.align(
wq.query.sequences.reference,
wq.query.sequences.query,
wq.query.ranges,
),
};
span.exit();
let _span = info_span!("Reporting alignment", pos = %wq.metadata.cluster_region).entered();
let mut out = std::io::stdout().lock();
rmp_serde::encode::write(&mut out, &result)?;
std::mem::drop(out);
Ok(())
}
fn read_from_stdin() -> anyhow::Result<WorkerQuery<'static>> {
let mut stdin = std::io::stdin().lock();
if stdin.is_terminal() {
eprintln!(
"The `worker` subcommand is not designed to be called directly but only as a subroutine of twitcher. You will most likely not get the expected functionality."
);
}
let wq = rmp_serde::from_read(&mut stdin)?;
Ok(wq)
}