use std::marker::PhantomData;
use bumpalo::Bump;
use color_eyre::eyre::Result;
use crate::record::{ReadStats, RecordView};
pub(crate) struct Logical;
pub(crate) struct Execution;
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub(crate) enum OrphanPolicy {
#[default]
DropPair,
#[allow(
dead_code,
reason = "orphan emission is supported by the plan model even though the current CLI keeps paired output conservative"
)]
EmitOrphan,
}
pub(crate) struct TransformArena {
bump: Bump,
}
impl TransformArena {
pub fn new() -> Self {
Self { bump: Bump::new() }
}
pub fn reset(&mut self) {
self.bump.reset();
}
pub fn alloc_slice_copy<'a>(&'a self, bytes: &[u8]) -> &'a [u8] {
self.bump.alloc_slice_copy(bytes)
}
}
pub(crate) trait RejectionReason: std::fmt::Debug + Send + Sync + 'static {
fn code(&self) -> &'static str;
}
pub(crate) trait ReadFilter: Send + Sync + 'static {
type Reason: RejectionReason;
fn evaluate(&self, record: &RecordView<'_>) -> Result<(), Self::Reason>;
}
pub(crate) trait ReadTransform: Send + Sync + 'static {
fn code(&self) -> &'static str;
fn apply<'a>(&self, record: RecordView<'a>, arena: &'a TransformArena) -> TransformResult<'a>;
}
#[derive(Clone, Copy)]
pub(crate) enum ActiveUnit<'a> {
Single(RecordView<'a>),
Pair(RecordPair<'a>),
}
#[derive(Clone, Copy)]
pub(crate) enum EmittedUnit<'a> {
None,
Single(RecordView<'a>),
Pair(RecordPair<'a>),
}
pub(crate) enum PairTransformResult<'a> {
Pair {
pair: RecordPair<'a>,
applied: bool,
},
Single {
record: RecordView<'a>,
applied: bool,
},
#[allow(
dead_code,
reason = "pair-aware extension points may reject whole units even though merge-pairs currently keeps unmerged pairs"
)]
Drop {
reason: &'static str,
},
}
pub(crate) trait PairTransform: Send {
fn code(&self) -> &'static str;
fn apply_pair<'a>(
&mut self,
pair: RecordPair<'a>,
arena: &'a TransformArena,
) -> Result<PairTransformResult<'a>>;
fn apply_single<'a>(
&mut self,
record: RecordView<'a>,
_arena: &'a TransformArena,
) -> Result<PairTransformResult<'a>> {
Ok(PairTransformResult::Single {
record,
applied: false,
})
}
}
pub(crate) struct TransformResult<'a> {
pub record: RecordView<'a>,
pub applied: bool,
}
pub(crate) enum StepOutcome<'a> {
Continue(ActiveUnit<'a>),
Stop(ExecutionOutcome<'a>),
}
pub(crate) struct StepContext<'a, 'stats> {
arena: &'a TransformArena,
stats: &'stats mut ReadStats,
orphan_policy: OrphanPolicy,
rejection_count: usize,
}
impl StepContext<'_, '_> {
fn record_rejection(&mut self, code: &'static str) {
self.stats.record_rejected(code);
self.rejection_count += 1;
}
fn record_transform(&mut self, code: &'static str) {
self.stats.record_transform(code);
}
fn outcome<'a>(&self, emitted: EmittedUnit<'a>) -> ExecutionOutcome<'a> {
ExecutionOutcome {
emitted,
rejection_count: self.rejection_count,
}
}
}
pub(crate) trait ExecutionStep: Send {
fn apply<'a>(
&mut self,
unit: ActiveUnit<'a>,
context: &mut StepContext<'a, '_>,
) -> Result<StepOutcome<'a>>;
}
pub(crate) struct FilterStep<F>(pub(crate) F);
impl<F> ExecutionStep for FilterStep<F>
where
F: ReadFilter,
{
fn apply<'a>(
&mut self,
unit: ActiveUnit<'a>,
context: &mut StepContext<'a, '_>,
) -> Result<StepOutcome<'a>> {
Ok(match unit {
ActiveUnit::Single(record) => match self.0.evaluate(&record) {
Ok(()) => StepOutcome::Continue(ActiveUnit::Single(record)),
Err(reason) => {
context.record_rejection(reason.code());
StepOutcome::Stop(context.outcome(EmittedUnit::None))
}
},
ActiveUnit::Pair(pair) => apply_filter_to_pair(&self.0, pair, context),
})
}
}
fn apply_filter_to_pair<'a, F>(
filter: &F,
pair: RecordPair<'a>,
context: &mut StepContext<'a, '_>,
) -> StepOutcome<'a>
where
F: ReadFilter,
{
let left = filter.evaluate(&pair.left).map_err(|reason| reason.code());
let right = filter.evaluate(&pair.right).map_err(|reason| reason.code());
match (left, right) {
(Ok(()), Ok(())) => StepOutcome::Continue(ActiveUnit::Pair(pair)),
(Err(left_reason), Ok(())) => {
context.record_rejection(left_reason);
match context.orphan_policy {
OrphanPolicy::DropPair => StepOutcome::Stop(context.outcome(EmittedUnit::None)),
OrphanPolicy::EmitOrphan => StepOutcome::Continue(ActiveUnit::Single(pair.right)),
}
}
(Ok(()), Err(right_reason)) => {
context.record_rejection(right_reason);
match context.orphan_policy {
OrphanPolicy::DropPair => StepOutcome::Stop(context.outcome(EmittedUnit::None)),
OrphanPolicy::EmitOrphan => StepOutcome::Continue(ActiveUnit::Single(pair.left)),
}
}
(Err(left_reason), Err(right_reason)) => {
context.record_rejection(left_reason);
context.record_rejection(right_reason);
StepOutcome::Stop(context.outcome(EmittedUnit::None))
}
}
}
pub(crate) struct TransformStep<T>(pub(crate) T);
impl<T> ExecutionStep for TransformStep<T>
where
T: ReadTransform,
{
fn apply<'a>(
&mut self,
unit: ActiveUnit<'a>,
context: &mut StepContext<'a, '_>,
) -> Result<StepOutcome<'a>> {
let code = self.0.code();
let mut map_one = |record| {
let result = self.0.apply(record, context.arena);
if result.applied {
context.record_transform(code);
}
result.record
};
Ok(StepOutcome::Continue(match unit {
ActiveUnit::Single(record) => ActiveUnit::Single(map_one(record)),
ActiveUnit::Pair(pair) => ActiveUnit::Pair(RecordPair {
left: map_one(pair.left),
right: map_one(pair.right),
}),
}))
}
}
pub(crate) struct PairTransformStep<T>(pub(crate) T);
impl<T> ExecutionStep for PairTransformStep<T>
where
T: PairTransform,
{
fn apply<'a>(
&mut self,
unit: ActiveUnit<'a>,
context: &mut StepContext<'a, '_>,
) -> Result<StepOutcome<'a>> {
let result = match unit {
ActiveUnit::Single(record) => self.0.apply_single(record, context.arena)?,
ActiveUnit::Pair(pair) => self.0.apply_pair(pair, context.arena)?,
};
Ok(match result {
PairTransformResult::Pair { pair, applied } => {
if applied {
context.record_transform(self.0.code());
}
StepOutcome::Continue(ActiveUnit::Pair(pair))
}
PairTransformResult::Single { record, applied } => {
if applied {
context.record_transform(self.0.code());
}
StepOutcome::Continue(ActiveUnit::Single(record))
}
PairTransformResult::Drop { reason } => {
context.record_rejection(reason);
StepOutcome::Stop(context.outcome(EmittedUnit::None))
}
})
}
}
pub(crate) trait IntoExecutionStep {
fn into_execution_step(self) -> Box<dyn ExecutionStep>;
}
pub(crate) struct Plan<S> {
steps: Vec<Box<dyn ExecutionStep>>,
orphan_policy: OrphanPolicy,
_state: PhantomData<S>,
}
impl Plan<Logical> {
pub fn new() -> Self {
Self {
steps: Vec::new(),
orphan_policy: OrphanPolicy::default(),
_state: PhantomData,
}
}
}
pub(crate) trait BuildPlan: Sized {
type Execution;
fn step<O>(self, op: O) -> Self
where
O: IntoExecutionStep;
fn orphan_policy(self, policy: OrphanPolicy) -> Self;
fn compile(self) -> Self::Execution;
}
impl BuildPlan for Plan<Logical> {
type Execution = Plan<Execution>;
fn step<O>(mut self, op: O) -> Self
where
O: IntoExecutionStep,
{
self.steps.push(op.into_execution_step());
self
}
fn orphan_policy(mut self, policy: OrphanPolicy) -> Self {
self.orphan_policy = policy;
self
}
fn compile(self) -> Self::Execution {
Plan {
steps: self.steps,
orphan_policy: self.orphan_policy,
_state: PhantomData,
}
}
}
#[derive(Clone, Copy)]
pub(crate) struct RecordPair<'a> {
pub left: RecordView<'a>,
pub right: RecordView<'a>,
}
pub(crate) struct ExecutionOutcome<'a> {
emitted: EmittedUnit<'a>,
rejection_count: usize,
}
impl<'a> ExecutionOutcome<'a> {
pub fn emitted_unit(&self) -> EmittedUnit<'a> {
self.emitted
}
pub fn emitted(&self) -> impl Iterator<Item = RecordView<'a>> {
let records = match self.emitted {
EmittedUnit::None => [None, None],
EmittedUnit::Single(record) => [Some(record), None],
EmittedUnit::Pair(pair) => [Some(pair.left), Some(pair.right)],
};
records.into_iter().flatten()
}
pub fn rejection_count(&self) -> usize {
self.rejection_count
}
}
pub(crate) trait Execute<'a, In> {
fn execute(
&mut self,
input: In,
arena: &'a mut TransformArena,
stats: &mut ReadStats,
) -> Result<ExecutionOutcome<'a>>;
}
impl Plan<Execution> {
fn execute_unit<'a>(
&mut self,
mut unit: ActiveUnit<'a>,
arena: &'a TransformArena,
stats: &mut ReadStats,
) -> Result<ExecutionOutcome<'a>> {
let mut context = StepContext {
arena,
stats,
orphan_policy: self.orphan_policy,
rejection_count: 0,
};
for step in &mut self.steps {
match step.apply(unit, &mut context)? {
StepOutcome::Continue(next) => unit = next,
StepOutcome::Stop(outcome) => return Ok(outcome),
}
}
Ok(context.outcome(match unit {
ActiveUnit::Single(record) => EmittedUnit::Single(record),
ActiveUnit::Pair(pair) => EmittedUnit::Pair(pair),
}))
}
}
impl<'a> Execute<'a, RecordView<'a>> for Plan<Execution> {
fn execute(
&mut self,
input: RecordView<'a>,
arena: &'a mut TransformArena,
stats: &mut ReadStats,
) -> Result<ExecutionOutcome<'a>> {
self.execute_unit(ActiveUnit::Single(input), arena, stats)
}
}
impl<'a> Execute<'a, RecordPair<'a>> for Plan<Execution> {
fn execute(
&mut self,
input: RecordPair<'a>,
arena: &'a mut TransformArena,
stats: &mut ReadStats,
) -> Result<ExecutionOutcome<'a>> {
self.execute_unit(ActiveUnit::Pair(input), arena, stats)
}
}
#[cfg(test)]
mod tests {
use super::{
BuildPlan, Execute, ExecutionOutcome, FilterStep, IntoExecutionStep, Logical, OrphanPolicy,
Plan, ReadFilter, ReadTransform, RecordPair, RejectionReason, TransformArena,
TransformResult, TransformStep,
};
use crate::record::{ReadStats, RecordView};
#[derive(Debug)]
struct TooShort;
impl RejectionReason for TooShort {
fn code(&self) -> &'static str {
"too_short"
}
}
struct MinLength {
min_length: usize,
}
impl MinLength {
fn new(min_length: usize) -> Self {
Self { min_length }
}
}
impl ReadFilter for MinLength {
type Reason = TooShort;
fn evaluate(&self, record: &RecordView<'_>) -> Result<(), Self::Reason> {
if record.sequence().len() < self.min_length {
Err(TooShort)
} else {
Ok(())
}
}
}
impl IntoExecutionStep for MinLength {
fn into_execution_step(self) -> Box<dyn super::ExecutionStep> {
Box::new(FilterStep(self))
}
}
struct TrimPrefix {
amount: usize,
}
impl TrimPrefix {
fn new(amount: usize) -> Self {
Self { amount }
}
}
impl ReadTransform for TrimPrefix {
fn code(&self) -> &'static str {
"trim_prefix"
}
fn apply<'a>(
&self,
record: RecordView<'a>,
_arena: &'a TransformArena,
) -> TransformResult<'a> {
TransformResult {
record: record
.with_sequence_and_quality(
&record.sequence()[self.amount..],
&record.quality()[self.amount..],
)
.expect("trim prefix should preserve equal sequence and quality lengths"),
applied: true,
}
}
}
impl IntoExecutionStep for TrimPrefix {
fn into_execution_step(self) -> Box<dyn super::ExecutionStep> {
Box::new(TransformStep(self))
}
}
fn record(sequence: &'static [u8]) -> RecordView<'static> {
let quality = match sequence.len() {
6 => b"IIIIII".as_slice(),
_ => b"IIII".as_slice(),
};
RecordView::new(b"read1", sequence, quality)
}
#[test]
fn logical_plan_accumulates_steps_and_compiles() {
let _plan = Plan::<Logical>::new()
.step(MinLength::new(4))
.step(TrimPrefix::new(1))
.compile();
}
#[test]
fn single_execution_rejects_record_when_filter_fails() {
let mut plan = Plan::<Logical>::new().step(MinLength::new(6)).compile();
let mut arena = TransformArena::new();
let mut stats = ReadStats::default();
let outcome = plan
.execute(record(b"ACGT"), &mut arena, &mut stats)
.expect("single-record filter failure should produce a rejected outcome");
assert_eq!(outcome.emitted().count(), 0);
assert_eq!(outcome.rejection_count(), 1);
assert_eq!(stats.rejection_counts.get("too_short"), Some(&1));
}
#[test]
fn single_execution_applies_transform_in_order() {
let mut plan = Plan::<Logical>::new().step(TrimPrefix::new(1)).compile();
let mut arena = TransformArena::new();
let mut stats = ReadStats::default();
let outcome = plan
.execute(record(b"ACGT"), &mut arena, &mut stats)
.expect("single-record transform should produce an emitted outcome");
assert_eq!(outcome.emitted().count(), 1);
assert_eq!(
outcome
.emitted()
.next()
.expect("record should emit")
.sequence(),
b"CGT"
);
}
#[test]
fn paired_execution_drops_orphan_by_default() {
let mut plan = Plan::<Logical>::new().step(MinLength::new(6)).compile();
let mut arena = TransformArena::new();
let mut stats = ReadStats::default();
let outcome = plan
.execute(
RecordPair {
left: record(b"ACGTAC"),
right: record(b"ACGT"),
},
&mut arena,
&mut stats,
)
.expect("paired filter failure should produce a rejected outcome");
assert_eq!(outcome.emitted().count(), 0);
assert_eq!(outcome.rejection_count(), 1);
}
#[test]
fn paired_execution_can_emit_orphan_when_policy_allows() {
let mut plan = Plan::<Logical>::new()
.step(MinLength::new(6))
.orphan_policy(OrphanPolicy::EmitOrphan)
.compile();
let mut arena = TransformArena::new();
let mut stats = ReadStats::default();
let outcome = plan
.execute(
RecordPair {
left: record(b"ACGTAC"),
right: record(b"ACGT"),
},
&mut arena,
&mut stats,
)
.expect("paired orphan policy should produce an emitted orphan outcome");
assert_eq!(outcome.emitted().count(), 1);
assert_eq!(outcome.rejection_count(), 1);
}
#[test]
fn emitted_iterator_preserves_slot_order() {
let outcome = ExecutionOutcome {
emitted: super::EmittedUnit::Pair(RecordPair {
left: record(b"AAAAAA"),
right: record(b"CCCCCC"),
}),
rejection_count: 0,
};
let emitted = outcome
.emitted()
.map(|record| record.sequence())
.collect::<Vec<_>>();
assert_eq!(emitted, vec![b"AAAAAA".as_slice(), b"CCCCCC".as_slice()]);
}
}