use std::sync::Arc;
use clap::Parser;
use indicatif::ParallelProgressIterator;
use prost::Message;
use rayon::prelude::{IntoParallelRefIterator, ParallelIterator};
use crate::{common, helixmtdb};
#[derive(Parser, Debug, Clone)]
#[command(about = "import HelixMtDb data into RocksDB", long_about = None)]
pub struct Args {
#[arg(long, value_enum)]
pub genome_release: common::cli::GenomeRelease,
#[arg(long, required = true)]
pub path_in_vcf: String,
#[arg(long)]
pub path_out_rocksdb: String,
#[arg(long, default_value = "100000")]
pub tbi_window_size: usize,
#[arg(long, default_value = "helixmtdb_data")]
pub cf_name: String,
#[arg(long)]
pub path_wal_dir: Option<String>,
}
fn tsv_import(
db: Arc<rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>>,
args: &Args,
) -> Result<(), anyhow::Error> {
let tabix_src = format!("{}.tbi", args.path_in_vcf);
let index = noodles_tabix::read(tabix_src)?;
let header = index.header().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::InvalidInput, "missing tabix header")
})?;
let canonical_header_chroms = header
.reference_sequence_names()
.iter()
.filter_map(|chrom| {
let canon_chrom = chrom.strip_prefix("chr").unwrap_or(chrom);
if common::cli::is_canonical(canon_chrom) {
Some((common::cli::canonicalize(canon_chrom), chrom.clone()))
} else {
None
}
})
.collect::<std::collections::HashMap<String, String>>();
let windows =
common::cli::build_genome_windows(args.genome_release.into(), Some(args.tbi_window_size))?
.into_iter()
.filter_map(|(window_chrom, begin, end)| {
let canon_chrom = common::cli::canonicalize(&window_chrom);
canonical_header_chroms
.get(&canon_chrom)
.map(|header_chrom| (header_chrom.clone(), begin, end))
})
.collect::<Vec<_>>();
tracing::info!("Loading HelixMtDB VCF file into RocksDB...");
let before_loading = std::time::Instant::now();
windows
.par_iter()
.progress_with(common::cli::progress_bar(windows.len()))
.map(|(chrom, begin, end)| process_window(db.clone(), chrom, *begin, *end, args))
.collect::<Result<Vec<_>, _>>()?;
tracing::info!(
"... done loading HelixMtDB VCF file into RocksDB in {:?}",
before_loading.elapsed()
);
Ok(())
}
fn process_window(
db: Arc<rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>>,
chrom: &str,
begin: usize,
end: usize,
args: &Args,
) -> Result<(), anyhow::Error> {
let cf_helix = db.cf_handle(&args.cf_name).unwrap();
let mut reader =
noodles_vcf::indexed_reader::Builder::default().build_from_path(&args.path_in_vcf)?;
let header = reader.read_header()?;
let raw_region = format!("{}:{}-{}", chrom, begin + 1, end);
tracing::debug!(" processing region: {}", raw_region);
let region = raw_region.parse()?;
let query = match reader.query(&header, ®ion) {
Ok(result) => Ok(Some(result)),
Err(e) => {
let needle = "region reference sequence does not exist in reference sequences";
if e.to_string().contains(needle) {
Ok(None)
} else {
Err(e)
}
}
}?;
if let Some(query) = query {
for result in query {
let vcf_record = result?;
for allele_no in 0..vcf_record.alternate_bases().len() {
let key_buf: Vec<u8> =
common::keys::Var::from_vcf_allele(&vcf_record, allele_no).into();
let record = helixmtdb::pbs::Record::from_vcf_allele(&vcf_record, allele_no)?;
tracing::trace!(" record: {:?}", &record);
let record_buf = record.encode_to_vec();
db.put_cf(&cf_helix, &key_buf, &record_buf)?;
}
}
}
Ok(())
}
pub fn run(common: &common::cli::Args, args: &Args) -> Result<(), anyhow::Error> {
tracing::info!("Starting 'helixmtdb import' command");
tracing::info!("common = {:#?}", &common);
tracing::info!("args = {:#?}", &args);
tracing::info!("Opening RocksDB for writing ...");
let before_opening_rocksdb = std::time::Instant::now();
let options = common::rocks_utils::tune_options(
rocksdb::Options::default(),
args.path_wal_dir.as_ref().map(|s| s.as_ref()),
);
let cf_names = &["meta", &args.cf_name];
let db = Arc::new(rocksdb::DB::open_cf_with_opts(
&options,
&args.path_out_rocksdb,
cf_names
.iter()
.map(|name| (name.to_string(), options.clone()))
.collect::<Vec<_>>(),
)?);
tracing::info!(" writing meta information");
let cf_meta = db.cf_handle("meta").unwrap();
db.put_cf(&cf_meta, "annonars-version", crate::VERSION)?;
db.put_cf(
&cf_meta,
"genome-release",
format!("{}", args.genome_release),
)?;
tracing::info!(
"... done opening RocksDB for writing in {:?}",
before_opening_rocksdb.elapsed()
);
tsv_import(db.clone(), args)?;
tracing::info!("Running RocksDB compaction ...");
let before_compaction = std::time::Instant::now();
common::rocks_utils::force_compaction_cf(&db, cf_names, Some(" "), true)?;
tracing::info!(
"... done compacting RocksDB in {:?}",
before_compaction.elapsed()
);
tracing::info!("All done. Have a nice day!");
Ok(())
}
#[cfg(test)]
mod test {
use super::*;
use clap_verbosity_flag::Verbosity;
use temp_testdir::TempDir;
#[test]
fn smoke_test_import_helix() {
let tmp_dir = TempDir::default();
let common = common::cli::Args {
verbose: Verbosity::new(1, 0),
};
let args = Args {
genome_release: common::cli::GenomeRelease::Grch37,
path_in_vcf: String::from("tests/helixmtdb/example/helixmtdb.vcf.bgz"),
path_out_rocksdb: format!("{}", tmp_dir.join("out-rocksdb").display()),
cf_name: String::from("helixmtdb_data"),
path_wal_dir: None,
tbi_window_size: 1_000_000,
};
run(&common, &args).unwrap();
}
}