use std::io::{self, Write};
use std::ops::Range;
use std::sync::Arc;
use crate::call::whole_genome::PerContig;
use crate::core::{governor, AlignedRead, ContigSet, CoreError, WorkingSet};
use crate::genomics::ReferenceView;
use crate::pileup::{PileupColumn, PileupEngine, PileupParams, ReadSource, SkipCounts};
pub const FEATURE_HEADER: &str = "#contig\tpos\tref\tdepth\traw_depth\ta\tc\tg\tt\t\
a_fwd\ta_rev\tc_fwd\tc_rev\tg_fwd\tg_rev\tt_fwd\tt_rev\tmean_bq\tmean_mapq";
pub fn write_feature_header<W: Write + ?Sized>(out: &mut W) -> io::Result<()> {
writeln!(out, "{FEATURE_HEADER}")
}
pub fn write_feature_row<W: Write + ?Sized>(
out: &mut W,
contig_name: &str,
col: &PileupColumn,
) -> io::Result<()> {
let depth = col.depth();
let ac = col.allele_counts();
let sc = col.strand_counts();
let (sum_bq, sum_mapq) = col.obs.iter().fold((0u64, 0u64), |(b, m), o| {
(b + o.base_qual as u64, m + o.mapq as u64)
});
let (mean_bq, mean_mapq) = if depth > 0 {
(sum_bq as f64 / depth as f64, sum_mapq as f64 / depth as f64)
} else {
(0.0, 0.0)
};
writeln!(
out,
"{contig}\t{pos}\t{refb}\t{depth}\t{raw}\t\
{a}\t{c}\t{g}\t{t}\t\
{afwd}\t{arev}\t{cfwd}\t{crev}\t{gfwd}\t{grev}\t{tfwd}\t{trev}\t\
{mean_bq:.2}\t{mean_mapq:.2}",
contig = contig_name,
pos = col.locus.pos.0 as u64 + 1,
refb = col.ref_base as char,
depth = depth,
raw = col.raw_depth,
a = ac[0],
c = ac[1],
g = ac[2],
t = ac[3],
afwd = sc[0][0],
arev = sc[0][1],
cfwd = sc[1][0],
crev = sc[1][1],
gfwd = sc[2][0],
grev = sc[2][1],
tfwd = sc[3][0],
trev = sc[3][1],
)
}
pub fn stream_features_region<S: ReadSource>(
source: S,
reference: Arc<[u8]>,
contig: u32,
region: Range<u32>,
pileup_params: PileupParams,
on_row: &mut dyn FnMut(&PileupColumn) -> Result<(), CoreError>,
) -> Result<(WorkingSet, SkipCounts), CoreError> {
let mut engine = PileupEngine::new(source, reference, contig, region, pileup_params);
let mut max_ws = WorkingSet { bytes: 0 };
while let Some(column) = engine.next() {
let column = column?;
governor::checkpoint()?;
let ws = engine.current_working_set();
if ws.bytes > max_ws.bytes {
max_ws = ws;
}
on_row(&column)?;
}
Ok((max_ws, engine.skip_counts()))
}
pub fn stream_features_whole_genome<S: ReadSource>(
mut source: S,
ref_view: &ReferenceView,
contigs: &ContigSet,
pileup_params: PileupParams,
on_row: &mut dyn FnMut(&PileupColumn, &str) -> Result<(), CoreError>,
) -> Result<(WorkingSet, SkipCounts), CoreError> {
let mut max_ws = WorkingSet { bytes: 0 };
let mut skips = SkipCounts::default();
let mut peeked: Option<AlignedRead> = None;
for c in contigs.iter() {
governor::checkpoint()?;
let start = c.global_offset as usize;
let end = c.global_offset as usize + c.length as usize;
let reference: Arc<[u8]> = ref_view.decode_window_arc(start, end);
let per = PerContig {
source: &mut source,
contig: c.id,
peeked: &mut peeked,
};
let region: Range<u32> = 0..c.length;
let (ws, contig_skips) = stream_features_region(
per,
reference,
c.id,
region,
pileup_params.clone(),
&mut |col| on_row(col, &c.name),
)?;
if ws.bytes > max_ws.bytes {
max_ws = ws;
}
skips.accumulate(&contig_skips);
}
Ok((max_ws, skips))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{CigarOp, CigarOpKind, Position, SamFlags};
use crate::pileup::SliceSource;
fn mread(pos: u32, seq: &[u8], reverse: bool) -> AlignedRead {
let flags = if reverse {
SamFlags(SamFlags::REVERSE)
} else {
SamFlags::default()
};
AlignedRead {
contig: 0,
pos: Position(pos),
mapq: 60,
flags,
cigar: vec![CigarOp::new(CigarOpKind::Match, seq.len() as u32)],
seq: Arc::from(seq.to_vec().into_boxed_slice()),
qual: Arc::from(vec![30u8; seq.len()].into_boxed_slice()),
}
}
fn rows_for(reads: Vec<AlignedRead>, reference: &[u8]) -> String {
let mut out: Vec<u8> = Vec::new();
write_feature_header(&mut out).unwrap();
stream_features_region(
SliceSource::new(reads),
Arc::from(reference.to_vec().into_boxed_slice()),
0,
0..reference.len() as u32,
PileupParams::default(),
&mut |col| write_feature_row(&mut out, "chr1", col).map_err(CoreError::from),
)
.unwrap();
String::from_utf8(out).unwrap()
}
#[test]
fn feature_row_has_the_expected_fields() {
let reference = b"AAAA";
let reads = vec![mread(0, b"AAAA", false), mread(0, b"CAAA", true)];
let text = rows_for(reads, reference);
let line0 = text.lines().nth(1).expect("a data row"); let f: Vec<&str> = line0.split('\t').collect();
assert_eq!(f[0], "chr1");
assert_eq!(f[1], "1"); assert_eq!(f[2], "A");
assert_eq!(f[3], "2"); assert_eq!(f[5], "1"); assert_eq!(f[6], "1"); assert_eq!(f[9], "1"); assert_eq!(f[12], "1"); assert_eq!(f[17], "30.00"); assert_eq!(f[18], "60.00"); assert_eq!(f.len(), 19);
}
#[test]
fn feature_stream_is_deterministic_and_bounded() {
use crate::core::MemoryBudget;
let reference = vec![b'A'; 50_000];
let reads: Vec<AlignedRead> = (0..50_000u32).map(|p| mread(p, b"C", false)).collect();
let a = rows_for(reads.clone(), &reference);
let b = rows_for(reads, &reference);
assert_eq!(a, b, "feature output must be byte-identical run-to-run");
let reference2 = vec![b'A'; 50_000];
let reads2: Vec<AlignedRead> = (0..50_000u32).map(|p| mread(p, b"C", false)).collect();
let mut max_ws = 0u64;
let (ws, _skips) = stream_features_region(
SliceSource::new(reads2),
Arc::from(reference2.into_boxed_slice()),
0,
0..50_000,
PileupParams::default(),
&mut |_col| Ok(()),
)
.unwrap();
max_ws = max_ws.max(ws.bytes);
assert!(
ws.fits(MemoryBudget::from_mb(1)),
"feature working set {max_ws} should fit 1 MiB"
);
}
}