use noodles::vcf::variant::record_buf::samples::sample::Value;
use noodles::vcf::variant::record_buf::{AlternateBases, RecordBuf, Samples, samples::Keys};
use std::io;
use tracing;
use crate::conversion::{ConversionConfig, Ploidy, determine_ploidy, format_genotype};
use crate::dtc::{self, Allele as DtcAllele, Record as DtcRecord};
use crate::external_sort::{
DtcExternalSorter, DtcOrder, RecordExternalSorter, RecordOrder, SortFormat, SortedRecordIter,
};
use crate::reference::ReferenceGenome;
use clap::ValueEnum;
pub fn get_max_records_limit() -> Option<usize> {
std::env::var("CONVERT_GENOME_MAX_RECORDS")
.ok()
.and_then(|s| s.parse().ok())
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, ValueEnum)]
pub enum InputFormat {
Dtc,
Vcf,
Bcf,
Auto,
}
impl InputFormat {
pub fn detect(path: &std::path::Path) -> Self {
use std::io::BufRead;
if let Ok(mut reader) = crate::smart_reader::open_input(path) {
if let Ok(buf) = reader.fill_buf() {
if buf.len() >= 3 && &buf[..3] == b"BCF" {
return Self::Bcf;
}
if !buf.is_empty() && buf[0] == b'#' {
if buf.starts_with(b"##fileformat=VCF") {
return Self::Vcf;
}
}
}
}
if let Some(filename) = path.file_name().map(|n| n.to_string_lossy().to_lowercase()) {
if filename.ends_with(".vcf.gz") || filename.ends_with(".vcf") {
return Self::Vcf;
} else if filename.ends_with(".bcf") || filename.ends_with(".bcf.gz") {
return Self::Bcf;
}
}
Self::Dtc }
}
pub trait VariantSource {
fn next_variant(
&mut self,
summary: &mut crate::ConversionSummary,
) -> Option<io::Result<RecordBuf>>;
}
impl<T: VariantSource + ?Sized> VariantSource for Box<T> {
fn next_variant(
&mut self,
summary: &mut crate::ConversionSummary,
) -> Option<io::Result<RecordBuf>> {
(**self).next_variant(summary)
}
}
pub struct DtcSource {
records: crate::external_sort::DtcSortedIter,
reference: Option<ReferenceGenome>,
config: ConversionConfig,
header_keys: Keys,
initial_parse_errors: usize,
initial_stats_synced: bool,
inferred_strand: crate::source_ref::InferredStrand,
}
impl DtcSource {
pub fn new<R: std::io::BufRead>(
reader: dtc::Reader<R>,
reference: Option<ReferenceGenome>,
config: ConversionConfig,
inferred_strand: Option<crate::source_ref::InferredStrand>,
) -> io::Result<Self> {
let keys: Keys = vec![String::from("GT")].into_iter().collect();
let max_records = get_max_records_limit();
let order = if let Some(ref r) = reference {
DtcOrder::from_reference(r)
} else {
DtcOrder::Natural
};
let mut sorter = DtcExternalSorter::new(order);
let mut parse_errors = 0;
let mut records_read = 0;
for res in reader {
if let Some(limit) = max_records
&& records_read >= limit
{
tracing::info!(limit, "reached max_records limit, stopping read");
break;
}
match res {
Ok(rec) => {
sorter.push(rec)?;
records_read += 1;
}
Err(e) => {
parse_errors += 1;
tracing::warn!("failed to read input line: {}", e);
}
}
}
let records = sorter.finish()?;
Ok(Self {
records,
reference,
config,
header_keys: keys,
initial_parse_errors: parse_errors,
initial_stats_synced: false,
inferred_strand: inferred_strand.unwrap_or(crate::source_ref::InferredStrand::Forward),
})
}
fn convert_record(
&self,
record: DtcRecord,
summary: &mut crate::ConversionSummary,
) -> io::Result<Option<RecordBuf>> {
let canonical_name = if let Some(ref r) = self.reference {
if let Some(name) = r.resolve_contig_name(&record.chromosome) {
name.to_string()
} else {
summary.unknown_chromosomes += 1;
return Ok(None);
}
} else {
record.chromosome.clone()
};
let position = record.position;
let reference_base = if let Some(ref r) = self.reference {
match r.base(&canonical_name, position) {
Ok(base) => base,
Err(e) => {
summary.reference_failures += 1;
tracing::warn!(
"reference lookup failed for {}:{}: {}",
canonical_name,
position,
e
);
return Ok(None);
}
}
} else {
'N'
};
let mut allele_states = match record.parse_alleles() {
Ok(states) => states,
Err(e) => {
summary.invalid_genotypes += 1;
tracing::warn!("invalid alleles for {}:{}: {}", canonical_name, position, e);
return Ok(None);
}
};
if self.inferred_strand == crate::source_ref::InferredStrand::Reverse {
for allele in &mut allele_states {
if let DtcAllele::Base(s) = allele {
*s = reverse_complement(s);
}
}
}
let ploidy = determine_ploidy(
&canonical_name,
position,
self.config.sex.unwrap_or(crate::cli::Sex::Unknown),
self.config.par_boundaries.as_ref(),
);
let normalized_alleles = match ploidy {
Ploidy::Zero => {
summary.missing_genotype_records += 1;
return Ok(None);
}
Ploidy::Haploid => {
let unique: Vec<_> = allele_states
.iter()
.filter(|&a| *a != DtcAllele::Missing)
.cloned()
.collect();
let mut uniq_vals = unique.clone();
uniq_vals.dedup();
if uniq_vals.is_empty() {
vec![DtcAllele::Missing]
} else if uniq_vals.len() == 1 {
vec![uniq_vals[0].clone()]
} else {
vec![DtcAllele::Missing]
}
}
Ploidy::Diploid => allele_states,
};
let mut alt_bases = Vec::new();
for allele in normalized_alleles.iter() {
match allele {
DtcAllele::Base(base) => {
let b = base.to_uppercase();
let r = reference_base.to_string().to_uppercase();
if b != r && !alt_bases.contains(&b) {
alt_bases.push(b);
}
}
DtcAllele::Deletion => {
let del = String::from("DEL");
if !alt_bases.contains(&del) {
alt_bases.push(del);
}
}
DtcAllele::Insertion => {
let ins = String::from("INS");
if !alt_bases.contains(&ins) {
alt_bases.push(ins);
}
}
DtcAllele::Missing => {}
}
}
if alt_bases.is_empty() && !self.config.include_reference_sites {
summary.skipped_reference_sites += 1;
return Ok(None);
}
let formatted_alts: Vec<String> = alt_bases
.iter()
.map(|a| {
if a == "DEL" || a == "INS" {
format!("<{}>", a)
} else {
a.clone()
}
})
.collect();
let alternate_bases = AlternateBases::from(formatted_alts);
let genotype_string = format_genotype(&normalized_alleles, reference_base, &alt_bases)
.map_err(|e| {
summary.invalid_genotypes += 1;
io::Error::new(io::ErrorKind::InvalidData, e.to_string())
});
let genotype_string = match genotype_string {
Ok(s) => s,
Err(e) => {
tracing::warn!("genotype formatting failed: {}", e);
return Ok(None); }
};
let ids = record.id.as_ref().map(|id| {
[id.clone()]
.into_iter()
.collect::<noodles::vcf::variant::record_buf::Ids>()
});
let samples = Samples::new(
self.header_keys.clone(),
vec![vec![Some(Value::String(genotype_string))]],
);
let mut info_map: noodles::vcf::variant::record_buf::Info = Default::default();
let has_del = alt_bases.iter().any(|b| b == "DEL");
let has_ins = alt_bases.iter().any(|b| b == "INS");
if has_del || has_ins {
info_map.insert(
String::from("IMPRECISE"),
Some(noodles::vcf::variant::record_buf::info::field::Value::Flag),
);
let svtype = if has_del && has_ins {
"COMPLEX"
} else if has_del {
"DEL"
} else {
"INS"
};
info_map.insert(
String::from("SVTYPE"),
Some(noodles::vcf::variant::record_buf::info::field::Value::String(svtype.into())),
);
}
Ok(Some(
RecordBuf::builder()
.set_reference_sequence_name(canonical_name)
.set_variant_start(noodles::core::Position::new(position as usize).ok_or(
io::Error::new(io::ErrorKind::InvalidData, "invalid position"),
)?)
.set_ids(ids.unwrap_or_default()) .set_reference_bases(reference_base.to_string())
.set_alternate_bases(alternate_bases)
.set_info(info_map)
.set_samples(samples)
.build(),
))
}
}
impl VariantSource for DtcSource {
fn next_variant(
&mut self,
summary: &mut crate::ConversionSummary,
) -> Option<io::Result<RecordBuf>> {
if !self.initial_stats_synced {
summary.parse_errors += self.initial_parse_errors;
self.initial_stats_synced = true;
}
while let Some(record) = self.records.next() {
summary.total_records += 1;
match self.convert_record(record, summary) {
Ok(Some(buf)) => return Some(Ok(buf)),
Ok(None) => continue, Err(e) => return Some(Err(e)),
}
}
None
}
}
use noodles::vcf;
use std::io::BufRead;
pub struct VcfSource {
records: SortedRecordIter,
initial_parse_errors: usize,
initial_stats_synced: bool,
}
impl VcfSource {
pub fn new_without_reference<R: BufRead>(mut reader: vcf::io::Reader<R>) -> io::Result<Self> {
let header = reader.read_header()?;
let max_records = get_max_records_limit();
let mut parse_errors = 0;
let mut records_read = 0;
let mut sorter =
RecordExternalSorter::new(header.clone(), SortFormat::Vcf, RecordOrder::Natural);
for result in reader.record_bufs(&header) {
if let Some(limit) = max_records
&& records_read >= limit
{
tracing::info!(limit, "reached max_records limit, stopping read");
break;
}
match result {
Ok(record) => {
sorter.push(record)?;
records_read += 1;
}
Err(e) => {
parse_errors += 1;
tracing::warn!("failed to read VCF record: {}", e);
}
}
}
let records = sorter.finish()?;
Ok(Self {
records,
initial_parse_errors: parse_errors,
initial_stats_synced: false,
})
}
pub fn new<R: BufRead>(
mut reader: vcf::io::Reader<R>,
reference: &ReferenceGenome,
) -> io::Result<Self> {
let header = reader.read_header()?;
let max_records = get_max_records_limit();
let mut parse_errors = 0;
let mut records_read = 0;
let order = RecordOrder::from_reference(reference);
let mut sorter = RecordExternalSorter::new(header.clone(), SortFormat::Vcf, order);
for result in reader.record_bufs(&header) {
if let Some(limit) = max_records
&& records_read >= limit
{
tracing::info!(limit, "reached max_records limit, stopping read");
break;
}
match result {
Ok(record) => {
sorter.push(record)?;
records_read += 1;
}
Err(e) => {
parse_errors += 1;
tracing::warn!("failed to read VCF record: {}", e);
}
}
}
let records = sorter.finish()?;
Ok(Self {
records,
initial_parse_errors: parse_errors,
initial_stats_synced: false,
})
}
}
impl VariantSource for VcfSource {
fn next_variant(
&mut self,
summary: &mut crate::ConversionSummary,
) -> Option<io::Result<RecordBuf>> {
if !self.initial_stats_synced {
summary.parse_errors += self.initial_parse_errors;
self.initial_stats_synced = true;
}
self.records.next().map(|record| {
summary.total_records += 1;
record
})
}
}
use noodles::bcf;
pub struct BcfSource {
records: SortedRecordIter,
initial_parse_errors: usize,
initial_stats_synced: bool,
}
impl BcfSource {
pub fn new_without_reference<R: std::io::Read>(
mut reader: bcf::io::Reader<R>,
) -> io::Result<Self> {
let header = reader.read_header()?;
let max_records = get_max_records_limit();
let mut parse_errors = 0;
let mut records_read = 0;
let mut sorter =
RecordExternalSorter::new(header.clone(), SortFormat::Bcf, RecordOrder::Natural);
for result in reader.record_bufs(&header) {
if let Some(limit) = max_records
&& records_read >= limit
{
tracing::info!(limit, "reached max_records limit, stopping read");
break;
}
match result {
Ok(record) => {
sorter.push(record)?;
records_read += 1;
}
Err(e) => {
parse_errors += 1;
tracing::warn!("failed to read BCF record: {}", e);
}
}
}
let records = sorter.finish()?;
Ok(Self {
records,
initial_parse_errors: parse_errors,
initial_stats_synced: false,
})
}
pub fn new<R: std::io::Read>(
mut reader: bcf::io::Reader<R>,
reference: &ReferenceGenome,
) -> io::Result<Self> {
let header = reader.read_header()?;
let max_records = get_max_records_limit();
let mut parse_errors = 0;
let mut records_read = 0;
let order = RecordOrder::from_reference(reference);
let mut sorter = RecordExternalSorter::new(header.clone(), SortFormat::Bcf, order);
for result in reader.record_bufs(&header) {
if let Some(limit) = max_records
&& records_read >= limit
{
tracing::info!(limit, "reached max_records limit, stopping read");
break;
}
match result {
Ok(record) => {
sorter.push(record)?;
records_read += 1;
}
Err(e) => {
parse_errors += 1;
tracing::warn!("failed to read BCF record: {}", e);
}
}
}
let records = sorter.finish()?;
Ok(Self {
records,
initial_parse_errors: parse_errors,
initial_stats_synced: false,
})
}
}
impl VariantSource for BcfSource {
fn next_variant(
&mut self,
summary: &mut crate::ConversionSummary,
) -> Option<io::Result<RecordBuf>> {
if !self.initial_stats_synced {
summary.parse_errors += self.initial_parse_errors;
self.initial_stats_synced = true;
}
self.records.next().map(|record| {
summary.total_records += 1;
record
})
}
}
pub fn natural_contig_order(name: &str) -> (u32, String) {
let normalized = name
.trim_start_matches("chr")
.trim_start_matches("Chr")
.trim_start_matches("CHR");
if let Ok(num) = normalized.parse::<u32>() {
return (num, String::new());
}
match normalized.to_uppercase().as_str() {
"X" => (23, String::new()),
"Y" => (24, String::new()),
"M" | "MT" => (25, String::new()),
other => (1000, other.to_string()), }
}
fn reverse_complement(s: &str) -> String {
s.chars()
.rev()
.map(|c| match c {
'A' => 'T',
'T' => 'A',
'C' => 'G',
'G' => 'C',
'a' => 't',
't' => 'a',
'c' => 'g',
'g' => 'c',
'N' => 'N',
'n' => 'n',
_ => c,
})
.collect()
}