use std::sync::Arc;
use clap::Parser;
use prost::Message;
use crate::{common, genes::cli::data::clingen_gene};
pub mod clingen {
use std::{
collections::HashMap,
io::{BufRead as _, BufReader},
};
#[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)]
pub struct Region {
#[serde(alias = "#ISCA ID")]
pub isca_id: String,
#[serde(alias = "ISCA Region Name")]
pub isca_region_name: String,
#[serde(alias = "Genomic Location")]
pub genomic_location: String,
#[serde(alias = "Haploinsufficiency Score", deserialize_with = "parse_score")]
pub haploinsufficiency_score: Option<u32>,
#[serde(alias = "Triplosensitivity Score", deserialize_with = "parse_score")]
pub triplosensitivity_score: Option<u32>,
#[serde(alias = "Haploinsufficiency Disease ID")]
pub haploinsufficiency_disease_id: Option<String>,
#[serde(alias = "Triplosensitivity Disease ID")]
pub triplosensitivity_disease_id: Option<String>,
}
fn parse_score<'de, D>(d: D) -> Result<Option<u32>, D::Error>
where
D: serde::Deserializer<'de>,
{
let tmp: String = serde::Deserialize::deserialize(d)?;
if tmp.is_empty() || tmp == "Not yet evaluated" || tmp == "-1" {
Ok(None)
} else {
Ok(Some(tmp.parse().map_err(serde::de::Error::custom)?))
}
}
pub fn load_clingen(path: &str) -> Result<HashMap<String, Region>, anyhow::Error> {
tracing::info!(" loading ClinGen region curations from {}", path);
let mut result = HashMap::new();
let mut reader = std::fs::File::open(path)
.map_err(|e| anyhow::anyhow!("problem opening file: {}", e))
.map(BufReader::new)?;
{
let mut buf = String::new();
for _ in 0..5 {
reader.read_line(&mut buf)?;
buf.clear();
}
}
let mut csv_reader = csv::ReaderBuilder::new()
.delimiter(b'\t')
.has_headers(true)
.flexible(true)
.from_reader(reader);
for record in csv_reader.deserialize() {
let record: Region =
record.map_err(|e| anyhow::anyhow!("problem parsing record: {}", e))?;
if genomic_location_to_interval(&record.genomic_location).is_err() {
tracing::warn!(
"skipping region with invalid genomic location: {}",
record.genomic_location
);
continue;
}
result.insert(record.isca_id.clone(), record);
}
Ok(result)
}
pub fn genomic_location_to_interval(
genomic_location: &str,
) -> Result<bio::bio_types::genome::Interval, anyhow::Error> {
let mut parts = genomic_location.split(':');
let chrom = parts.next().ok_or_else(|| {
anyhow::anyhow!(
"could not parse chromosome from genomic location: {}",
genomic_location
)
})?;
let mut parts = parts
.next()
.ok_or_else(|| anyhow::anyhow!("could not parse region {}", genomic_location))?
.split('-');
let begin = parts
.next()
.unwrap()
.parse::<u64>()
.map_err(|e| anyhow::anyhow!("could not parse start position from: {}", e))?
.saturating_sub(1);
let end = parts
.next()
.unwrap()
.parse::<u64>()
.map_err(|e| anyhow::anyhow!("could not parse end position from: {}", e))?;
Ok(bio::bio_types::genome::Interval::new(
chrom.to_string(),
begin..end,
))
}
}
#[derive(Parser, Debug, Clone)]
#[command(about = "import region annotation data", long_about = None)]
pub struct Args {
#[arg(long, value_enum)]
pub genome_release: common::cli::GenomeRelease,
#[arg(long, required = true)]
pub path_in_clingen: String,
#[arg(long)]
pub path_out_rocksdb: String,
#[arg(long, default_value = "regions")]
pub cf_name: String,
#[arg(long)]
pub path_wal_dir: Option<String>,
}
fn tsv_import(
db: &rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>,
args: &Args,
) -> Result<(), anyhow::Error> {
let cf_data = db.cf_handle(&args.cf_name).unwrap();
let regions = clingen::load_clingen(&args.path_in_clingen)?;
for (_, region) in regions {
let clingen::Region {
isca_id,
isca_region_name,
genomic_location,
haploinsufficiency_score,
triplosensitivity_score,
haploinsufficiency_disease_id,
triplosensitivity_disease_id,
} = region;
let haploinsufficiency_score = clingen_gene::Score::try_from(haploinsufficiency_score)
.map_err(|e| anyhow::anyhow!("problem parsing haplosensitivity score: {}", e))?;
let triplosensitivity_score = clingen_gene::Score::try_from(triplosensitivity_score)
.map_err(|e| anyhow::anyhow!("problem parsing triplosensitivity score: {}", e))?;
let region = crate::pbs::regions::clingen::Region {
isca_id,
isca_region_name,
genomic_location,
haploinsufficiency_score: haploinsufficiency_score as i32,
triplosensitivity_score: triplosensitivity_score as i32,
haploinsufficiency_disease_id,
triplosensitivity_disease_id,
};
let key = format!("clingen:{}", ®ion.isca_id);
db.put_cf(&cf_data, key.as_bytes(), region.encode_to_vec())?;
}
Ok(())
}
pub fn run(common: &common::cli::Args, args: &Args) -> Result<(), anyhow::Error> {
tracing::info!("Starting 'regions import' command");
tracing::info!("common = {:#?}", &common);
tracing::info!("args = {:#?}", &args);
tracing::info!("Opening RocksDB for writing ...");
let before_opening_rocksdb = std::time::Instant::now();
let options = rocksdb_utils_lookup::tune_options(
rocksdb::Options::default(),
args.path_wal_dir.as_ref().map(|s| s.as_ref()),
);
let cf_names = &["meta", &args.cf_name];
let db = Arc::new(rocksdb::DB::open_cf_with_opts(
&options,
common::readlink_f(&args.path_out_rocksdb)?,
cf_names
.iter()
.map(|name| (name.to_string(), options.clone()))
.collect::<Vec<_>>(),
)?);
tracing::info!(" writing meta information");
let cf_meta = db.cf_handle("meta").unwrap();
db.put_cf(&cf_meta, "annonars-version", crate::VERSION)?;
db.put_cf(
&cf_meta,
"genome-release",
format!("{}", args.genome_release),
)?;
db.put_cf(&cf_meta, "db-name", "regions")?;
tracing::info!(
"... done opening RocksDB for writing in {:?}",
before_opening_rocksdb.elapsed()
);
tracing::info!("Importing TSV files ...");
let before_import = std::time::Instant::now();
tsv_import(&db, args)?;
tracing::info!(
"... done importing TSV files in {:?}",
before_import.elapsed()
);
tracing::info!("Running RocksDB compaction ...");
let before_compaction = std::time::Instant::now();
rocksdb_utils_lookup::force_compaction_cf(&db, cf_names, Some(" "), true)?;
tracing::info!(
"... done compacting RocksDB in {:?}",
before_compaction.elapsed()
);
tracing::info!("All done. Have a nice day!");
Ok(())
}
#[cfg(test)]
mod test {
use super::*;
use clap_verbosity_flag::Verbosity;
use temp_testdir::TempDir;
#[test]
fn smoke_test_import() {
let tmp_dir = TempDir::default();
let common = common::cli::Args {
verbose: Verbosity::new(1, 0),
};
let args = Args {
genome_release: common::cli::GenomeRelease::Grch37,
path_in_clingen: String::from(
"tests/regions/clingen/ClinGen_region_curation_list_GRCh37.tsv",
),
path_out_rocksdb: format!("{}", tmp_dir.join("out-rocksdb").display()),
cf_name: String::from("regions"),
path_wal_dir: None,
};
run(&common, &args).unwrap();
}
}