pub mod auto;
pub mod mt;
pub mod reading;
pub mod xy;
use std::{collections::HashMap, sync::Arc};
use clap::Parser;
use indicatif::ParallelProgressIterator;
use rayon::prelude::*;
use crate::{common, freqs};
use reading::ContigMap;
#[derive(Parser, Debug, Clone)]
#[command(about = "Construct sequence variant frequencies database", long_about = None)]
pub struct Args {
#[arg(long, value_enum)]
pub genome_release: common::cli::GenomeRelease,
#[arg(long)]
pub path_out_rocksdb: String,
#[arg(long)]
pub path_gnomad_exomes_auto: Vec<String>,
#[arg(long)]
pub path_gnomad_genomes_auto: Vec<String>,
#[arg(long)]
pub path_gnomad_exomes_xy: Vec<String>,
#[arg(long)]
pub path_gnomad_genomes_xy: Vec<String>,
#[arg(long)]
pub path_gnomad_mtdna: Option<String>,
#[arg(long)]
pub path_helixmtdb: Option<String>,
#[arg(long)]
pub path_wal_dir: Option<String>,
#[arg(long, default_value = "100000")]
pub tbi_window_size: usize,
#[arg(long)]
pub gnomad_genomes_version: String,
#[arg(long)]
pub gnomad_exomes_version: String,
#[arg(long)]
pub gnomad_mtdna_version: String,
#[arg(long)]
pub helixmtdb_version: String,
}
fn assign_to_chrom(
paths: &Vec<String>,
assembly: hgvs::static_data::Assembly,
) -> Result<HashMap<usize, String>, anyhow::Error> {
let contig_map = ContigMap::new(assembly);
let mut res = HashMap::new();
for path in paths {
let mut reader = noodles_vcf::indexed_reader::Builder::default().build_from_path(path)?;
let header = Box::new(reader.read_header()?);
freqs::cli::import::reading::guess_assembly(header.as_ref(), true, Some(assembly))?;
let record = reader
.records(header.as_ref())
.next()
.transpose()?
.ok_or(anyhow::anyhow!("No records in VCF file {}", path))?;
let k = contig_map.chrom_to_idx(record.chromosome());
let v = path.clone();
res.insert(k, v);
}
Ok(res)
}
pub fn build_windows(
genome_release: hgvs::static_data::Assembly,
tbi_window_size: usize,
paths: &[String],
) -> Result<Vec<(String, usize, usize)>, anyhow::Error> {
let mut result = Vec::new();
for path in paths.iter() {
let tabix_src = format!("{}.tbi", path);
let index = noodles_tabix::read(tabix_src)?;
let header = index.header().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::InvalidInput, "missing tabix header")
})?;
let canonical_header_chroms = header
.reference_sequence_names()
.iter()
.filter_map(|chrom| {
let canon_chrom = chrom.strip_prefix("chr").unwrap_or(chrom);
if common::cli::is_canonical(canon_chrom) {
Some((common::cli::canonicalize(canon_chrom), chrom.clone()))
} else {
None
}
})
.collect::<std::collections::HashMap<String, String>>();
result.append(
&mut common::cli::build_genome_windows(genome_release, Some(tbi_window_size))?
.into_iter()
.filter_map(|(window_chrom, begin, end)| {
let canon_chrom = common::cli::canonicalize(&window_chrom);
canonical_header_chroms
.get(&canon_chrom)
.map(|header_chrom| (header_chrom.clone(), begin, end))
})
.collect::<Vec<_>>(),
);
}
result.sort();
result.dedup();
Ok(result)
}
pub fn run(common: &common::cli::Args, args: &Args) -> Result<(), anyhow::Error> {
tracing::info!("Starting mehari frequency import ...");
tracing::info!(" common = {:#?}", &common);
tracing::info!(" args = {:#?}", &args);
let genome_release = match args.genome_release {
common::cli::GenomeRelease::Grch37 => hgvs::static_data::Assembly::Grch37p10, common::cli::GenomeRelease::Grch38 => hgvs::static_data::Assembly::Grch38,
};
tracing::info!("Opening RocksDB for writing ...");
let before_opening_rocksdb = std::time::Instant::now();
let options = rocksdb_utils_lookup::tune_options(
rocksdb::Options::default(),
args.path_wal_dir.as_ref().map(|s| s.as_ref()),
);
let cf_names = ["meta", "autosomal", "gonosomal", "mitochondrial"];
let db = Arc::new(rocksdb::DB::open_cf_with_opts(
&options,
&args.path_out_rocksdb,
cf_names
.iter()
.map(|name| (name.to_string(), options.clone()))
.collect::<Vec<_>>(),
)?);
tracing::info!(" writing meta information");
let cf_meta = db.cf_handle("meta").unwrap();
db.put_cf(&cf_meta, "annonars-version", crate::VERSION)?;
db.put_cf(
&cf_meta,
"gnomad-exomes-version",
&args.gnomad_exomes_version,
)?;
db.put_cf(
&cf_meta,
"gnomad-genomoes-version",
&args.gnomad_genomes_version,
)?;
db.put_cf(&cf_meta, "gnomad-mtdna-version", &args.gnomad_mtdna_version)?;
db.put_cf(&cf_meta, "helixmtdb-version", &args.helixmtdb_version)?;
db.put_cf(
&cf_meta,
"genome-release",
format!("{}", args.genome_release),
)?;
tracing::info!(
"... done opening RocksDB for writing in {:?}",
before_opening_rocksdb.elapsed()
);
tracing::info!("Determine each file's chromosome (assuming one chrom per file)...");
let before_chroms = std::time::Instant::now();
let genomes_auto_by_chrom = assign_to_chrom(&args.path_gnomad_genomes_auto, genome_release)?;
let exomes_auto_by_chrom = assign_to_chrom(&args.path_gnomad_exomes_auto, genome_release)?;
let auto_keys = {
let mut auto_keys = Vec::new();
genomes_auto_by_chrom
.keys()
.for_each(|k| auto_keys.push(*k));
exomes_auto_by_chrom.keys().for_each(|k| auto_keys.push(*k));
auto_keys.sort();
auto_keys.dedup();
auto_keys
};
let genomes_xy_by_chrom = assign_to_chrom(&args.path_gnomad_genomes_xy, genome_release)?;
let exomes_xy_by_chrom = assign_to_chrom(&args.path_gnomad_exomes_xy, genome_release)?;
let xy_keys = {
let mut xy_keys = Vec::new();
genomes_xy_by_chrom.keys().for_each(|k| xy_keys.push(*k));
exomes_xy_by_chrom.keys().for_each(|k| xy_keys.push(*k));
xy_keys.sort();
xy_keys.dedup();
xy_keys
};
tracing::info!(
"... done getting chromosomes in {:?}",
before_chroms.elapsed()
);
tracing::info!("Importing autosomal variants...");
let before_auto = std::time::Instant::now();
for k in &auto_keys {
let path_genome = genomes_auto_by_chrom.get(k);
let path_exome = exomes_auto_by_chrom.get(k);
tracing::info!(" contig {} from:", common::cli::CANONICAL[*k]);
tracing::info!(" - genomes: {:?}", path_genome);
tracing::info!(" - exomes: {:?}", path_exome);
let paths = {
let mut paths = Vec::new();
if let Some(path_genome) = path_genome {
paths.push(path_genome.clone());
}
if let Some(path_exome) = path_exome {
paths.push(path_exome.clone());
}
paths
};
let windows = build_windows(genome_release, args.tbi_window_size, &paths)?;
windows
.par_iter()
.progress_with(common::cli::progress_bar(windows.len()))
.map(|(chrom, begin, end)| {
let start = noodles_core::position::Position::try_from(begin + 1)?;
let stop = noodles_core::position::Position::try_from(*end)?;
let region = noodles_core::region::Region::new(chrom, start..=stop);
auto::import_region(&db, path_genome, path_exome, ®ion)
})
.collect::<Result<Vec<_>, _>>()?;
}
tracing::info!(
"... done importing autosomal variants in {:?}",
before_auto.elapsed()
);
tracing::info!("Importing gonosomal variants...");
let before_xy = std::time::Instant::now();
for k in &xy_keys {
let path_genome = genomes_xy_by_chrom.get(k);
let path_exome = exomes_xy_by_chrom.get(k);
tracing::info!(" contig {} from:", common::cli::CANONICAL[*k]);
tracing::info!(" - genomes: {:?}", path_genome);
tracing::info!(" - exomes: {:?}", path_exome);
let paths = {
let mut paths = Vec::new();
if let Some(path_genome) = path_genome {
paths.push(path_genome.clone());
}
if let Some(path_exome) = path_exome {
paths.push(path_exome.clone());
}
paths
};
let windows = build_windows(genome_release, args.tbi_window_size, &paths)?;
windows
.par_iter()
.progress_with(common::cli::progress_bar(windows.len()))
.map(|(chrom, begin, end)| {
let start = noodles_core::position::Position::try_from(begin + 1)?;
let stop = noodles_core::position::Position::try_from(*end)?;
let region = noodles_core::region::Region::new(chrom, start..=stop);
xy::import_region(&db, path_genome, path_exome, ®ion)
})
.collect::<Result<Vec<_>, _>>()?;
}
tracing::info!(
"... done importing gonosomal variants in {:?}",
before_xy.elapsed()
);
tracing::info!("Importing mitochondrial variants...");
let before_mito = std::time::Instant::now();
let path_gnomad = args.path_gnomad_mtdna.as_ref();
let path_helix = args.path_helixmtdb.as_ref();
tracing::info!(" contig MT from:");
tracing::info!(" - gnomAD: {:?}", &path_gnomad);
tracing::info!(" - HelixMtDb: {:?}", &path_helix);
let paths = {
let mut paths = Vec::new();
if let Some(path_gnomad) = path_gnomad {
paths.push(path_gnomad.clone());
}
if let Some(path_helix) = path_helix {
paths.push(path_helix.clone());
}
paths
};
let windows = build_windows(genome_release, args.tbi_window_size, &paths)?;
windows
.par_iter()
.progress_with(common::cli::progress_bar(windows.len()))
.map(|(chrom, begin, end)| {
let start = noodles_core::position::Position::try_from(begin + 1)?;
let stop = noodles_core::position::Position::try_from(*end)?;
let region = noodles_core::region::Region::new(chrom, start..=stop);
mt::import_region(&db, path_gnomad, path_helix, ®ion)
})
.collect::<Result<Vec<_>, _>>()?;
tracing::info!(
"... done importing mitochondrial variants in {:?}",
before_mito.elapsed()
);
tracing::info!("Running RocksDB compaction ...");
let before_compaction = std::time::Instant::now();
rocksdb_utils_lookup::force_compaction_cf(&db, cf_names, Some(" "), true)?;
tracing::info!(
"... done compacting RocksDB in {:?}",
before_compaction.elapsed()
);
tracing::info!("All done. Have a nice day!");
Ok(())
}