use std::ops::Range;
use std::sync::Arc;
use crate::call::{
call_germline, call_somatic, GermlineCall, GermlineParams, SomaticCall, SomaticParams,
};
use crate::core::{governor, CoreError, Locus, WorkingSet};
use crate::pileup::{PileupColumn, PileupEngine, PileupParams, ReadSource, SkipCounts};
pub fn call_germline_region_streaming<S: ReadSource>(
source: S,
reference: Arc<[u8]>,
contig: u32,
region: Range<u32>,
pileup_params: PileupParams,
germline_params: &GermlineParams,
on_row: &mut dyn FnMut((Locus, u8, GermlineCall)) -> Result<(), CoreError>,
) -> Result<(WorkingSet, SkipCounts), CoreError> {
let mut engine = PileupEngine::new(source, reference, contig, region, pileup_params);
let mut max_ws = WorkingSet { bytes: 0 };
while let Some(column) = engine.next() {
let column = column?;
governor::checkpoint()?;
let ws = engine.current_working_set();
if ws.bytes > max_ws.bytes {
max_ws = ws;
}
if let Some(call) = call_germline(&column, germline_params) {
on_row((column.locus, column.ref_base, call))?;
}
}
Ok((max_ws, engine.skip_counts()))
}
#[allow(clippy::type_complexity)] pub fn call_germline_region_tracked<S: ReadSource>(
source: S,
reference: Arc<[u8]>,
contig: u32,
region: Range<u32>,
pileup_params: PileupParams,
germline_params: &GermlineParams,
) -> Result<(Vec<(Locus, u8, GermlineCall)>, WorkingSet), CoreError> {
let mut out = Vec::new();
let (ws, _skips) = call_germline_region_streaming(
source,
reference,
contig,
region,
pileup_params,
germline_params,
&mut |row| {
out.push(row);
Ok(())
},
)?;
Ok((out, ws))
}
pub fn call_germline_region<S: ReadSource>(
source: S,
reference: Arc<[u8]>,
contig: u32,
region: Range<u32>,
pileup_params: PileupParams,
germline_params: &GermlineParams,
) -> Result<Vec<(Locus, u8, GermlineCall)>, CoreError> {
let (sites, _ws) = call_germline_region_tracked(
source,
reference,
contig,
region,
pileup_params,
germline_params,
)?;
Ok(sites)
}
pub fn call_somatic_region<T: ReadSource, N: ReadSource>(
tumor: T,
normal: N,
reference: Arc<[u8]>,
contig: u32,
region: Range<u32>,
pileup_params: PileupParams,
somatic_params: &SomaticParams,
) -> Result<Vec<(Locus, SomaticCall)>, CoreError> {
let tumor_cols: Vec<PileupColumn> = PileupEngine::new(
tumor,
Arc::clone(&reference),
contig,
region.clone(),
pileup_params.clone(),
)
.collect::<Result<_, _>>()?;
let normal_cols: Vec<PileupColumn> =
PileupEngine::new(normal, reference, contig, region, pileup_params)
.collect::<Result<_, _>>()?;
let mut out = Vec::new();
let mut n = 0usize;
for t in &tumor_cols {
while n < normal_cols.len() && normal_cols[n].locus.pos < t.locus.pos {
n += 1;
}
if n < normal_cols.len() && normal_cols[n].locus.pos == t.locus.pos {
if let Some(call) = call_somatic(t, &normal_cols[n], somatic_params) {
out.push((t.locus, call));
}
}
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::call::{Genotype, SomaticParams};
use crate::core::{AlignedRead, CigarOp, CigarOpKind, Position, SamFlags};
use crate::pileup::SliceSource;
fn read(pos: u32, seq: &[u8], reverse: bool) -> AlignedRead {
let flags = if reverse {
SamFlags(SamFlags::REVERSE)
} else {
SamFlags::default()
};
AlignedRead {
contig: 0,
pos: Position(pos),
mapq: 60,
flags,
cigar: vec![CigarOp::new(CigarOpKind::Match, seq.len() as u32)],
seq: Arc::from(seq.to_vec().into_boxed_slice()),
qual: Arc::from(vec![35u8; seq.len()].into_boxed_slice()),
}
}
#[test]
fn calls_a_het_snv_including_a_reverse_strand_read() {
let reference: Arc<[u8]> = Arc::from(b"AAAA".to_vec().into_boxed_slice());
let reads = vec![
read(0, b"AAAA", false), read(0, b"ACAA", false), read(0, b"ACAA", true), read(0, b"ACAA", false),
read(0, b"AAAA", false),
read(0, b"ACAA", true),
];
let source = SliceSource::new(reads);
let calls = call_germline_region(
source,
reference,
0,
0..4,
PileupParams::default(),
&GermlineParams::default(),
)
.unwrap();
assert_eq!(calls.len(), 1);
let (locus, ref_base, call) = &calls[0];
assert_eq!(locus.pos, Position(1));
assert_eq!(*ref_base, b'A');
assert_eq!(call.alt_base, b'C');
assert_eq!(call.genotype, Genotype::Het);
assert!(call.dp >= 6);
}
#[test]
fn pure_reference_yields_no_calls() {
let reference: Arc<[u8]> = Arc::from(b"ACGT".to_vec().into_boxed_slice());
let reads = vec![read(0, b"ACGT", false), read(0, b"ACGT", false)];
let source = SliceSource::new(reads);
let calls = call_germline_region(
source,
reference,
0,
0..4,
PileupParams::default(),
&GermlineParams::default(),
)
.unwrap();
assert!(calls.is_empty());
}
#[test]
fn calls_a_somatic_snv_from_tumor_normal_cowalk() {
let reference: Arc<[u8]> = Arc::from(b"AAAA".to_vec().into_boxed_slice());
let tumor: Vec<AlignedRead> = (0..6)
.map(|_| read(0, b"ACAA", false))
.chain((0..6).map(|_| read(0, b"AAAA", false)))
.collect();
let normal: Vec<AlignedRead> = (0..12).map(|_| read(0, b"AAAA", false)).collect();
let params = SomaticParams {
min_tumor_depth: 4,
min_normal_depth: 4,
min_tumor_af: 0.1,
max_normal_af: 0.05,
min_quality: 0.0,
seq_error_rate: 1e-3,
};
let calls = call_somatic_region(
SliceSource::new(tumor),
SliceSource::new(normal),
reference,
0,
0..4,
PileupParams::default(),
¶ms,
)
.unwrap();
assert_eq!(calls.len(), 1);
let (locus, call) = &calls[0];
assert_eq!(locus.pos, Position(1));
assert_eq!(call.ref_base, b'A');
assert_eq!(call.alt_base, b'C');
assert_eq!(call.normal_alt, 0);
}
#[test]
fn streaming_emits_same_sites_as_tracked_and_returns_working_set() {
let reference: Arc<[u8]> = Arc::from(b"AAAA".to_vec().into_boxed_slice());
let reads = vec![
read(0, b"ACAA", false),
read(0, b"ACAA", false),
read(0, b"AAAA", false),
read(0, b"ACAA", false),
];
let (collected, _ws) = call_germline_region_tracked(
SliceSource::new(reads.clone()),
Arc::clone(&reference),
0,
0..4,
PileupParams::default(),
&GermlineParams::default(),
)
.unwrap();
let mut streamed = Vec::new();
let (ws, _skips) = call_germline_region_streaming(
SliceSource::new(reads),
reference,
0,
0..4,
PileupParams::default(),
&GermlineParams::default(),
&mut |row| {
streamed.push(row);
Ok(())
},
)
.unwrap();
assert_eq!(streamed, collected, "streaming sites == tracked sites");
assert!(ws.bytes > 0, "working set tracked and non-zero");
}
}