use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use hgvs::data::uta_sr::{Config as UtaSrConfig, Provider};
use hgvs::mapper::variant::{Config as MapperConfig, Mapper};
use hgvs::normalizer::{Config as NormalizerConfig, Direction, Normalizer};
use hgvs::parser::{HgvsVariant, NoRef};
use hgvs::validator::IntrinsicValidator;
use crate::FerroError;
pub type HgvsRsNormalizeResult = (
Vec<crate::benchmark::ParseResult>,
std::time::Duration,
std::collections::HashMap<String, usize>,
);
#[derive(Debug, Clone)]
pub struct HgvsRsConfig {
pub uta_db_url: String,
pub uta_db_schema: String,
pub seqrepo_path: String,
pub lrg_mapping_file: Option<String>,
pub in_memory: bool,
}
impl Default for HgvsRsConfig {
fn default() -> Self {
Self {
uta_db_url: "postgresql://anonymous:anonymous@localhost:5432/uta".to_string(),
uta_db_schema: "uta_20210129b".to_string(),
seqrepo_path: "/usr/local/share/seqrepo/2021-01-29".to_string(),
lrg_mapping_file: None,
in_memory: false,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct LrgMapping {
mapping: std::collections::HashMap<String, String>,
}
impl LrgMapping {
pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, FerroError> {
use std::io::BufRead;
let file = std::fs::File::open(path.as_ref()).map_err(|e| FerroError::Io {
msg: format!("Failed to open LRG mapping file: {}", e),
})?;
let reader = std::io::BufReader::new(file);
let mut mapping = std::collections::HashMap::new();
for line in reader.lines() {
let line = line.map_err(|e| FerroError::Io {
msg: format!("Failed to read LRG mapping line: {}", e),
})?;
if line.starts_with('#') || line.trim().is_empty() {
continue;
}
let fields: Vec<&str> = line.split('\t').collect();
if fields.len() < 5 {
continue;
}
let lrg_id = fields[0]; let lrg_transcript = fields[3]; let refseq_transcript = fields[4];
if refseq_transcript.is_empty() || refseq_transcript == "-" {
continue;
}
let lrg_full = format!("{}{}", lrg_id, lrg_transcript);
mapping.insert(lrg_full, refseq_transcript.to_string());
}
Ok(Self { mapping })
}
pub fn translate_pattern(&self, pattern: &str) -> (String, Option<String>) {
if let Some(colon_pos) = pattern.find(':') {
let accession = &pattern[..colon_pos];
if accession.starts_with("LRG_") && accession.contains('t') {
if let Some(refseq) = self.mapping.get(accession) {
let translated = format!("{}{}", refseq, &pattern[colon_pos..]);
return (translated, Some(accession.to_string()));
}
}
}
(pattern.to_string(), None)
}
}
#[derive(Debug, Clone)]
pub struct HgvsRsResult {
pub input: String,
pub success: bool,
pub output: Option<String>,
pub error: Option<String>,
}
pub struct HgvsRsNormalizer {
mapper: Mapper,
}
impl HgvsRsNormalizer {
pub fn new(config: &HgvsRsConfig) -> Result<Self, FerroError> {
let uta_sr_config = UtaSrConfig {
db_url: config.uta_db_url.clone(),
db_schema: config.uta_db_schema.clone(),
seqrepo_path: config.seqrepo_path.clone(),
};
let provider = Arc::new(Provider::new(uta_sr_config).map_err(|e| FerroError::Io {
msg: format!("Failed to create hgvs-rs provider: {}", e),
})?);
let mapper_config = MapperConfig::default();
let mapper = Mapper::new(&mapper_config, provider);
Ok(Self { mapper })
}
pub fn with_provider(
provider: Arc<dyn hgvs::data::interface::Provider + Send + Sync>,
) -> Result<Self, FerroError> {
let mapper_config = MapperConfig::default();
let mapper = Mapper::new(&mapper_config, provider);
Ok(Self { mapper })
}
pub fn normalize(&self, hgvs_str: &str) -> HgvsRsResult {
use std::panic::{self, AssertUnwindSafe};
let input = hgvs_str.to_string();
let parsed = match HgvsVariant::from_str(hgvs_str) {
Ok(var) => var,
Err(e) => {
return HgvsRsResult {
input,
success: false,
output: None,
error: Some(format!("Parse error: {}", e)),
};
}
};
let validator = Arc::new(IntrinsicValidator::new(true));
let normalizer = Normalizer::new(
&self.mapper,
self.mapper.provider(),
validator,
NormalizerConfig {
shuffle_direction: Direction::FiveToThree,
cross_boundaries: false,
replace_reference: true,
..Default::default()
},
);
let normalize_result =
panic::catch_unwind(AssertUnwindSafe(|| normalizer.normalize(&parsed)));
match normalize_result {
Ok(Ok(normalized)) => {
let output = format!("{}", NoRef(&normalized));
HgvsRsResult {
input,
success: true,
output: Some(output),
error: None,
}
}
Ok(Err(e)) => HgvsRsResult {
input,
success: false,
output: None,
error: Some(format!("Normalization error: {}", e)),
},
Err(panic_info) => {
let panic_msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
s.to_string()
} else if let Some(s) = panic_info.downcast_ref::<String>() {
s.clone()
} else {
"unknown panic".to_string()
};
HgvsRsResult {
input,
success: false,
output: None,
error: Some(format!("Normalization panic: {}", panic_msg)),
}
}
}
}
pub fn normalize_batch(&self, hgvs_strs: &[String]) -> Vec<HgvsRsResult> {
hgvs_strs.iter().map(|s| self.normalize(s)).collect()
}
}
pub fn check_hgvs_rs_available(config: &HgvsRsConfig) -> Result<(), FerroError> {
let uta_sr_config = UtaSrConfig {
db_url: config.uta_db_url.clone(),
db_schema: config.uta_db_schema.clone(),
seqrepo_path: config.seqrepo_path.clone(),
};
Provider::new(uta_sr_config).map_err(|e| FerroError::Io {
msg: format!("hgvs-rs not available: {}", e),
})?;
Ok(())
}
pub fn run_hgvs_rs_normalize(
patterns: &[String],
config: &HgvsRsConfig,
) -> Result<HgvsRsNormalizeResult, crate::FerroError> {
use std::collections::HashMap;
use std::time::Instant;
let lrg_mapping = if let Some(ref path) = config.lrg_mapping_file {
Some(LrgMapping::load(path)?)
} else {
None
};
let start = Instant::now();
let normalizer = if config.in_memory {
let im_config = super::inmemory_provider::InMemoryProviderConfig {
uta_db_url: config.uta_db_url.clone(),
uta_db_schema: config.uta_db_schema.clone(),
seqrepo_path: Some(config.seqrepo_path.clone()),
};
let provider = Arc::new(
super::inmemory_provider::InMemoryProvider::new(&im_config).map_err(|e| {
FerroError::Io {
msg: format!("Failed to create in-memory provider: {}", e),
}
})?,
);
HgvsRsNormalizer::with_provider(provider)?
} else {
HgvsRsNormalizer::new(config)?
};
let results: Vec<crate::benchmark::ParseResult> = patterns
.iter()
.map(|pattern| {
let (translated_pattern, original_lrg) = if let Some(ref mapping) = lrg_mapping {
mapping.translate_pattern(pattern)
} else {
(pattern.clone(), None)
};
let result = normalizer.normalize(&translated_pattern);
let output = if original_lrg.is_some() && result.success {
result.output.map(|o| {
if let Some(ref lrg) = original_lrg {
if let Some(colon_pos) = o.find(':') {
format!("{}{}", lrg, &o[colon_pos..])
} else {
o
}
} else {
o
}
})
} else {
result.output
};
crate::benchmark::ParseResult {
input: pattern.clone(), success: result.success,
output,
error: result.error.clone(),
error_category: result.error.map(|e| categorize_hgvs_rs_error(&e)),
ref_mismatch: None,
details: None,
}
})
.collect();
let elapsed = start.elapsed();
let mut error_counts: HashMap<String, usize> = HashMap::new();
for result in &results {
if let Some(ref category) = result.error_category {
*error_counts.entry(category.clone()).or_insert(0) += 1;
}
}
Ok((results, elapsed, error_counts))
}
pub fn run_hgvs_rs_normalize_parallel(
patterns: &[String],
config: &HgvsRsConfig,
workers: usize,
) -> Result<HgvsRsNormalizeResult, crate::FerroError> {
use rayon::prelude::*;
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::Instant;
if workers <= 1 {
return run_hgvs_rs_normalize(patterns, config);
}
let lrg_mapping = if let Some(ref path) = config.lrg_mapping_file {
Some(LrgMapping::load(path)?)
} else {
None
};
eprintln!(
"Creating {} hgvs-rs normalizers (one per worker)...",
workers
);
let init_start = Instant::now();
let normalizers: Vec<_> = if config.in_memory {
let im_config = super::inmemory_provider::InMemoryProviderConfig {
uta_db_url: config.uta_db_url.clone(),
uta_db_schema: config.uta_db_schema.clone(),
seqrepo_path: Some(config.seqrepo_path.clone()),
};
let provider = Arc::new(
super::inmemory_provider::InMemoryProvider::new(&im_config).map_err(|e| {
FerroError::Io {
msg: format!("Failed to create in-memory provider: {}", e),
}
})?,
);
(0..workers)
.map(|_| {
HgvsRsNormalizer::with_provider(provider.clone())
.map_err(|e| format!("Failed to create normalizer: {}", e))
})
.collect()
} else {
(0..workers)
.map(|_| {
HgvsRsNormalizer::new(config)
.map_err(|e| format!("Failed to create normalizer: {}", e))
})
.collect()
};
let init_elapsed = init_start.elapsed();
eprintln!(
"Normalizer initialization took {:.3}s ({:.3}s per worker)",
init_elapsed.as_secs_f64(),
init_elapsed.as_secs_f64() / workers as f64,
);
let normalizer_pool: Vec<Mutex<Option<HgvsRsNormalizer>>> = normalizers
.into_iter()
.map(|r| match r {
Ok(n) => Mutex::new(Some(n)),
Err(_) => Mutex::new(None),
})
.collect();
let next_idx = std::sync::atomic::AtomicUsize::new(0);
let start = Instant::now();
let chunk_size = patterns.len().div_ceil(workers).max(1);
let chunk_results: Vec<Vec<(usize, crate::benchmark::ParseResult)>> = patterns
.chunks(chunk_size)
.enumerate()
.collect::<Vec<_>>()
.into_par_iter()
.map(|(chunk_idx, chunk)| {
let idx =
next_idx.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % normalizer_pool.len();
let guard = normalizer_pool[idx].lock().unwrap();
let normalizer = match guard.as_ref() {
Some(n) => n,
None => {
return chunk
.iter()
.enumerate()
.map(|(i, pattern)| {
let global_idx = chunk_idx * chunk_size + i;
(
global_idx,
crate::benchmark::ParseResult {
input: pattern.clone(),
success: false,
output: None,
error: Some("Failed to create normalizer".to_string()),
error_category: Some("CONNECTION_ERROR".to_string()),
ref_mismatch: None,
details: None,
},
)
})
.collect();
}
};
chunk
.iter()
.enumerate()
.map(|(i, pattern)| {
let global_idx = chunk_idx * chunk_size + i;
let (translated_pattern, original_lrg) = if let Some(ref mapping) = lrg_mapping
{
mapping.translate_pattern(pattern)
} else {
(pattern.clone(), None)
};
let hgvs_result = normalizer.normalize(&translated_pattern);
let output = if original_lrg.is_some() && hgvs_result.success {
hgvs_result.output.map(|o| {
if let Some(ref lrg) = original_lrg {
if let Some(colon_pos) = o.find(':') {
format!("{}{}", lrg, &o[colon_pos..])
} else {
o
}
} else {
o
}
})
} else {
hgvs_result.output
};
(
global_idx,
crate::benchmark::ParseResult {
input: pattern.clone(),
success: hgvs_result.success,
output,
error: hgvs_result.error.clone(),
error_category: hgvs_result.error.map(|e| categorize_hgvs_rs_error(&e)),
ref_mismatch: None,
details: None,
},
)
})
.collect()
})
.collect();
let elapsed = start.elapsed();
let mut indexed_results: Vec<(usize, crate::benchmark::ParseResult)> =
chunk_results.into_iter().flatten().collect();
indexed_results.sort_by_key(|(idx, _)| *idx);
let results: Vec<crate::benchmark::ParseResult> =
indexed_results.into_iter().map(|(_, r)| r).collect();
let mut error_counts: HashMap<String, usize> = HashMap::new();
for result in &results {
if let Some(ref category) = result.error_category {
*error_counts.entry(category.clone()).or_insert(0) += 1;
}
}
Ok((results, elapsed, error_counts))
}
fn categorize_hgvs_rs_error(error: &str) -> String {
if error.contains("Parse error") {
"PARSE_ERROR".to_string()
} else if error.contains("panic") || error.contains("Panic") {
"PANIC".to_string()
} else if error.contains("validation") || error.contains("Validation") {
"VALIDATION_ERROR".to_string()
} else if error.contains("connection") || error.contains("Connection") {
"CONNECTION_ERROR".to_string()
} else if error.contains("SeqRepo") || error.contains("sequence") {
"SEQREPO_ERROR".to_string()
} else if error.contains("transcript") || error.contains("Transcript") {
"TRANSCRIPT_ERROR".to_string()
} else if error.contains("intronic") || error.contains("Intronic") {
"INTRONIC_ERROR".to_string()
} else if error.contains("protein") || error.contains("Protein") {
"PROTEIN_ERROR".to_string()
} else {
"OTHER".to_string()
}
}
pub fn check_seqrepo_path(path: &Path) -> bool {
if !path.exists() {
return false;
}
let aliases_db = path.join("aliases.sqlite3");
let sequences_dir = path.join("sequences");
aliases_db.exists() && sequences_dir.exists()
}
pub fn parse_uta_url(url: &str) -> Option<(String, u16, String, String)> {
let url = url.strip_prefix("postgresql://")?;
let parts: Vec<&str> = url.splitn(2, '@').collect();
if parts.len() != 2 {
return None;
}
let host_db: Vec<&str> = parts[1].splitn(2, '/').collect();
if host_db.len() != 2 {
return None;
}
let host_port: Vec<&str> = host_db[0].splitn(2, ':').collect();
let host = host_port[0].to_string();
let port: u16 = host_port
.get(1)
.and_then(|p| p.parse().ok())
.unwrap_or(5432);
let dbname = host_db[1].to_string();
let creds: Vec<&str> = parts[0].splitn(2, ':').collect();
let user = creds[0].to_string();
Some((host, port, dbname, user))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_uta_url() {
let url = "postgresql://anonymous:anonymous@localhost:5432/uta";
let result = parse_uta_url(url);
assert!(result.is_some());
let (host, port, dbname, user) = result.unwrap();
assert_eq!(host, "localhost");
assert_eq!(port, 5432);
assert_eq!(dbname, "uta");
assert_eq!(user, "anonymous");
}
#[test]
fn test_parse_uta_url_remote() {
let url = "postgresql://anonymous:anonymous@uta.biocommons.org:5432/uta";
let result = parse_uta_url(url);
assert!(result.is_some());
let (host, port, dbname, user) = result.unwrap();
assert_eq!(host, "uta.biocommons.org");
assert_eq!(port, 5432);
assert_eq!(dbname, "uta");
assert_eq!(user, "anonymous");
}
#[test]
fn test_default_config() {
let config = HgvsRsConfig::default();
assert!(config.uta_db_url.contains("localhost"));
assert_eq!(config.uta_db_schema, "uta_20210129b");
}
}