use std::sync::Arc;
use prost::Message;
use crate::{
common::{self, cli::extract_chrom, keys, spdi},
cons,
};
#[derive(clap::Parser, Debug, Clone)]
#[command(about = "query conservation data from RocksDB", long_about = None)]
pub struct Args {
#[arg(long)]
pub path_rocksdb: String,
#[arg(long, default_value = "ucsc_conservation")]
pub cf_name: String,
#[arg(long, default_value = "-")]
pub out_file: String,
#[arg(long, default_value = "jsonl")]
pub out_format: common::cli::OutputFormat,
#[command(flatten)]
pub query: ArgsQuery,
#[arg(long)]
pub hgnc_id: Option<String>,
}
#[derive(clap::Args, Debug, Clone, Default)]
#[group(required = true, multiple = false)]
pub struct ArgsQuery {
#[arg(long, group = "query")]
pub range: Option<spdi::Range>,
#[arg(long, group = "query")]
pub all: bool,
}
#[derive(Debug)]
struct Meta {
pub genome_release: String,
}
fn open_rocksdb(
args: &Args,
) -> Result<(Arc<rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>>, Meta), anyhow::Error> {
tracing::info!("Opening RocksDB database ...");
let before_open = std::time::Instant::now();
let cf_names = &["meta", &args.cf_name];
let db = Arc::new(rocksdb::DB::open_cf_for_read_only(
&rocksdb::Options::default(),
&args.path_rocksdb,
cf_names,
true,
)?);
tracing::info!(" reading meta information");
let meta = {
let cf_meta = db.cf_handle("meta").unwrap();
let meta_genome_release = String::from_utf8(
db.get_cf(&cf_meta, "genome-release")?
.ok_or_else(|| anyhow::anyhow!("missing value meta:genome-release"))?,
)?;
Meta {
genome_release: meta_genome_release,
}
};
tracing::info!(" meta:genome-release = {}", &meta.genome_release);
tracing::info!(
"... opening RocksDB database took {:?}",
before_open.elapsed()
);
Ok((db, meta))
}
fn print_values(
out_writer: &mut Box<dyn std::io::Write>,
output_format: common::cli::OutputFormat,
record: &cons::pbs::Record,
) -> Result<(), anyhow::Error> {
match output_format {
common::cli::OutputFormat::Jsonl => {
writeln!(out_writer, "{}", serde_json::to_string(&record)?)?;
}
}
Ok(())
}
pub fn run(common: &common::cli::Args, args: &Args) -> Result<(), anyhow::Error> {
tracing::info!("Starting 'cons query' command");
tracing::info!("common = {:#?}", &common);
tracing::info!("args = {:#?}", &args);
let (db, meta) = open_rocksdb(args)?;
let cf_data = db.cf_handle(&args.cf_name).unwrap();
let mut out_writer = match args.out_file.as_ref() {
"-" => Box::new(std::io::stdout()) as Box<dyn std::io::Write>,
out_file => {
let path = std::path::Path::new(out_file);
Box::new(std::fs::File::create(path).unwrap()) as Box<dyn std::io::Write>
}
};
tracing::info!("Running query...");
let before_query = std::time::Instant::now();
let (start, stop) = if let Some(range) = args.query.range.as_ref() {
let range = spdi::Range {
sequence: extract_chrom::from_range(range, Some(&meta.genome_release))?,
..range.clone()
};
let (start, stop) = range.into();
(Some(start), Some(stop))
} else {
(None, None)
};
tracing::debug!(" start = {:?}, stop = {:?}", &start, &stop);
let mut iter = db.raw_iterator_cf(&cf_data);
if let Some(start) = start.as_ref() {
let tmp = keys::Pos {
chrom: start.sequence.clone(),
pos: start.position - 2,
};
let pos: keys::Pos = tmp;
let key: Vec<u8> = pos.into();
tracing::debug!(" seeking to key {:?}", &key);
iter.seek(&key);
} else {
iter.seek(b"")
}
let stop = stop.map(|stop| -> keys::Pos { stop.into() });
if let Some(stop) = stop.as_ref() {
let stop: Vec<u8> = stop.clone().into();
tracing::debug!(" stop = {:?}", &stop);
}
while iter.valid() {
if let Some(value) = iter.value() {
tracing::trace!(" iterator at {:?} => {:?}", &iter.key(), &value);
if let Some(stop) = stop.as_ref() {
let iter_key = iter.key().unwrap();
let iter_pos: keys::Pos = iter_key.into();
if &iter_pos > stop {
break;
}
}
let record_list = cons::pbs::RecordList::decode(value)?;
dbg!(&record_list);
for record in &record_list.records {
if let Some(start) = start.as_ref() {
if record.stop < start.position {
iter.next();
continue;
}
}
if let Some(hgnc_id) = args.hgnc_id.as_ref() {
if &record.hgnc_id != hgnc_id {
tracing::debug!(" skipping record {:?}", &record);
iter.next();
continue;
}
}
print_values(&mut out_writer, args.out_format, record)?;
}
iter.next();
} else {
break;
}
}
tracing::info!("... done querying in {:?}", before_query.elapsed());
tracing::info!("All done. Have a nice day!");
Ok(())
}
#[cfg(test)]
mod test {
use std::str::FromStr;
use super::*;
use temp_testdir::TempDir;
fn args(query: ArgsQuery, hgnc_id: Option<String>) -> (common::cli::Args, Args, TempDir) {
let temp = TempDir::default();
let common = common::cli::Args {
verbose: clap_verbosity_flag::Verbosity::new(1, 0),
};
let args = Args {
path_rocksdb: String::from("tests/cons/example/tgds.tsv.db"),
cf_name: String::from("ucsc_conservation"),
out_file: temp.join("out").to_string_lossy().to_string(),
out_format: common::cli::OutputFormat::Jsonl,
hgnc_id,
query,
};
(common, args, temp)
}
#[test]
fn smoke_query_range_without_hgnc_id() -> Result<(), anyhow::Error> {
let (common, args, _temp) = args(
ArgsQuery {
range: Some(spdi::Range::from_str("GRCh37:13:95248336:95248351")?),
all: false,
},
None,
);
run(&common, &args)?;
let out_data = std::fs::read_to_string(&args.out_file)?;
insta::assert_snapshot!(&out_data);
Ok(())
}
#[test]
fn smoke_query_all_without_hgnc_id_result() -> Result<(), anyhow::Error> {
let (common, args, _temp) = args(
ArgsQuery {
range: None,
all: true,
},
None,
);
run(&common, &args)?;
let out_data = std::fs::read_to_string(&args.out_file)?;
insta::assert_snapshot!(&out_data);
Ok(())
}
#[test]
fn smoke_query_range_with_hgnc_id_result() -> Result<(), anyhow::Error> {
let (common, args, _temp) = args(
ArgsQuery {
range: Some(spdi::Range::from_str("GRCh37:13:95248336:95248351")?),
all: false,
},
Some(String::from("HGNC:20324")),
);
run(&common, &args)?;
let out_data = std::fs::read_to_string(&args.out_file)?;
insta::assert_snapshot!(&out_data);
Ok(())
}
#[test]
fn smoke_query_range_with_hgnc_id_no_result() -> Result<(), anyhow::Error> {
let (common, args, _temp) = args(
ArgsQuery {
range: Some(spdi::Range::from_str("GRCh37:13:95248334:95248351")?),
all: false,
},
Some(String::from("nonexisting")),
);
run(&common, &args)?;
let out_data = std::fs::read_to_string(&args.out_file)?;
insta::assert_snapshot!(&out_data);
Ok(())
}
#[test]
fn smoke_query_all_with_hgnc_id_no_result() -> Result<(), anyhow::Error> {
let (common, args, _temp) = args(
ArgsQuery {
range: None,
all: true,
},
Some(String::from("nonexisting")),
);
run(&common, &args)?;
let out_data = std::fs::read_to_string(&args.out_file)?;
insta::assert_snapshot!(&out_data);
Ok(())
}
}