use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::PyDict;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::path::Path;
use std::sync::Arc;
use crate::backtranslate::{Backtranslator, CodonChange, CodonTable};
use crate::batch::{BatchProcessor, BatchProgress, BatchResult};
use crate::convert::CoordinateMapper;
use crate::coords::{OneBasedPos, ZeroBasedPos};
use crate::effect::{Consequence, EffectPredictor, Impact, ProteinEffect};
use crate::equivalence::{EquivalenceChecker, EquivalenceLevel, EquivalenceResult};
use crate::error_handling::{CorrectionWarning, ErrorConfig, ErrorMode, ErrorOverride, ErrorType};
use crate::hgvs::location::{AminoAcid, CdsPos, TxPos};
use crate::hgvs::variant::HgvsVariant;
use crate::mave::{is_mave_short_form, parse_mave_hgvs, MaveContext};
use crate::prepare::{check_references, prepare_references, PrepareConfig, ReferenceManifest};
use crate::python_helpers::{
get_indel_length, get_num_variants, get_substitution_bases, get_variant_edit_type,
get_variant_end, get_variant_offset, get_variant_reference, get_variant_start, is_frameshift,
is_identity, parse_direction, variant_type_str,
};
use crate::reference::provider::ReferenceProvider;
use crate::reference::transcript::{GenomeBuild, Strand};
use crate::reference::{MockProvider, MultiFastaProvider};
use crate::rsid::{format_rsid, parse_rsid as rust_parse_rsid, InMemoryRsIdLookup, RsIdResult};
use crate::spdi::{hgvs_to_spdi_simple, parse_spdi as rust_parse_spdi, spdi_to_hgvs, SpdiVariant};
use crate::vcf::{vcf_to_genomic_hgvs as rust_vcf_to_hgvs, VcfRecord};
use crate::{parse_hgvs, NormalizeConfig, Normalizer};
#[derive(Clone)]
enum PyProvider {
Mock(MockProvider),
MultiFasta(Arc<MultiFastaProvider>),
}
impl ReferenceProvider for PyProvider {
fn get_transcript(
&self,
id: &str,
) -> Result<crate::reference::transcript::Transcript, crate::error::FerroError> {
match self {
PyProvider::Mock(p) => p.get_transcript(id),
PyProvider::MultiFasta(p) => p.get_transcript(id),
}
}
fn get_sequence(
&self,
id: &str,
start: u64,
end: u64,
) -> Result<String, crate::error::FerroError> {
match self {
PyProvider::Mock(p) => p.get_sequence(id, start, end),
PyProvider::MultiFasta(p) => p.get_sequence(id, start, end),
}
}
fn get_genomic_sequence(
&self,
contig: &str,
start: u64,
end: u64,
) -> Result<String, crate::error::FerroError> {
match self {
PyProvider::Mock(p) => p.get_genomic_sequence(contig, start, end),
PyProvider::MultiFasta(p) => p.get_genomic_sequence(contig, start, end),
}
}
fn has_genomic_data(&self) -> bool {
match self {
PyProvider::Mock(p) => p.has_genomic_data(),
PyProvider::MultiFasta(p) => p.has_genomic_data(),
}
}
fn get_protein_sequence(
&self,
accession: &str,
start: u64,
end: u64,
) -> Result<String, crate::error::FerroError> {
match self {
PyProvider::Mock(p) => p.get_protein_sequence(accession, start, end),
PyProvider::MultiFasta(p) => p.get_protein_sequence(accession, start, end),
}
}
fn has_protein_data(&self) -> bool {
match self {
PyProvider::Mock(p) => p.has_protein_data(),
PyProvider::MultiFasta(p) => p.has_protein_data(),
}
}
}
impl PyProvider {
fn from_json(path: &Path) -> PyResult<Self> {
MockProvider::from_json(path)
.map(PyProvider::Mock)
.map_err(|e| PyRuntimeError::new_err(format!("Failed to load reference: {}", e)))
}
fn from_manifest(path: &Path) -> PyResult<Self> {
MultiFastaProvider::from_manifest(path)
.map(|p| PyProvider::MultiFasta(Arc::new(p)))
.map_err(|e| PyRuntimeError::new_err(format!("Failed to load manifest: {}", e)))
}
fn test_data() -> Self {
PyProvider::Mock(MockProvider::with_test_data())
}
}
#[pyfunction]
fn parse(hgvs_string: &str) -> PyResult<PyHgvsVariant> {
match parse_hgvs(hgvs_string) {
Ok(variant) => Ok(PyHgvsVariant { inner: variant }),
Err(e) => Err(PyValueError::new_err(format!("Parse error: {}", e))),
}
}
#[pyfunction]
#[pyo3(signature = (hgvs_string, direction="3prime"))]
fn normalize(hgvs_string: &str, direction: &str) -> PyResult<String> {
let variant = parse_hgvs(hgvs_string)
.map_err(|e| PyValueError::new_err(format!("Parse error: {}", e)))?;
let config = NormalizeConfig::default().with_direction(parse_direction(direction));
let provider = MockProvider::with_test_data();
let normalizer = Normalizer::with_config(provider, config);
let normalized = normalizer
.normalize(&variant)
.map_err(|e| PyRuntimeError::new_err(format!("Normalization error: {}", e)))?;
Ok(normalized.to_string())
}
#[pyclass(name = "HgvsVariant", from_py_object)]
#[derive(Clone)]
pub struct PyHgvsVariant {
pub(crate) inner: HgvsVariant,
}
#[pymethods]
impl PyHgvsVariant {
fn __str__(&self) -> String {
self.inner.to_string()
}
fn __repr__(&self) -> String {
format!("HgvsVariant('{}')", self.inner)
}
fn __eq__(&self, other: &Self) -> bool {
self.inner.to_string() == other.inner.to_string()
}
fn __hash__(&self) -> u64 {
let mut hasher = DefaultHasher::new();
self.inner.to_string().hash(&mut hasher);
hasher.finish()
}
#[getter]
fn variant_type(&self) -> &'static str {
variant_type_str(&self.inner)
}
#[getter]
fn reference(&self) -> PyResult<String> {
get_variant_reference(&self.inner).map_err(PyValueError::new_err)
}
#[getter]
fn edit_type(&self) -> &'static str {
get_variant_edit_type(&self.inner)
}
#[getter]
fn start(&self) -> Option<i64> {
get_variant_start(&self.inner)
}
#[getter]
fn end(&self) -> Option<i64> {
get_variant_end(&self.inner)
}
#[getter]
fn offset(&self) -> Option<i64> {
get_variant_offset(&self.inner)
}
#[getter]
fn substitution_bases(&self) -> Option<(char, char)> {
get_substitution_bases(&self.inner)
}
#[getter]
fn num_variants(&self) -> usize {
get_num_variants(&self.inner)
}
fn variants(&self) -> Vec<PyHgvsVariant> {
match &self.inner {
HgvsVariant::Allele(a) => a
.variants
.iter()
.map(|v| PyHgvsVariant { inner: v.clone() })
.collect(),
_ => vec![self.clone()],
}
}
#[getter]
fn indel_length(&self) -> Option<i64> {
get_indel_length(&self.inner)
}
fn is_identity(&self) -> bool {
is_identity(&self.inner)
}
fn is_frameshift(&self) -> bool {
is_frameshift(&self.inner)
}
fn is_genomic(&self) -> bool {
matches!(self.inner, HgvsVariant::Genome(_))
}
fn is_coding(&self) -> bool {
matches!(self.inner, HgvsVariant::Cds(_))
}
fn is_noncoding(&self) -> bool {
matches!(self.inner, HgvsVariant::Tx(_))
}
fn is_protein(&self) -> bool {
matches!(self.inner, HgvsVariant::Protein(_))
}
fn is_rna(&self) -> bool {
matches!(self.inner, HgvsVariant::Rna(_))
}
fn is_mitochondrial(&self) -> bool {
matches!(self.inner, HgvsVariant::Mt(_))
}
fn is_substitution(&self) -> bool {
get_variant_edit_type(&self.inner) == "substitution"
}
fn is_deletion(&self) -> bool {
get_variant_edit_type(&self.inner) == "deletion"
}
fn is_insertion(&self) -> bool {
get_variant_edit_type(&self.inner) == "insertion"
}
fn is_duplication(&self) -> bool {
get_variant_edit_type(&self.inner) == "duplication"
}
fn is_delins(&self) -> bool {
get_variant_edit_type(&self.inner) == "delins"
}
#[pyo3(signature = (direction="3prime"))]
fn normalize(&self, direction: &str) -> PyResult<PyHgvsVariant> {
let config = NormalizeConfig::default().with_direction(parse_direction(direction));
let provider = MockProvider::with_test_data();
let normalizer = Normalizer::with_config(provider, config);
let normalized = normalizer
.normalize(&self.inner)
.map_err(|e| PyRuntimeError::new_err(format!("Normalization error: {}", e)))?;
Ok(PyHgvsVariant { inner: normalized })
}
fn to_dict<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
let dict = PyDict::new(py);
dict.set_item("string", self.inner.to_string())?;
dict.set_item("variant_type", self.variant_type())?;
if let Ok(ref_str) = self.reference() {
dict.set_item("reference", ref_str)?;
}
dict.set_item("edit_type", self.edit_type())?;
if let Some(start) = get_variant_start(&self.inner) {
dict.set_item("start", start)?;
}
if let Some(end) = get_variant_end(&self.inner) {
dict.set_item("end", end)?;
}
if let Some(offset) = get_variant_offset(&self.inner) {
dict.set_item("offset", offset)?;
}
if let Some((ref_base, alt_base)) = get_substitution_bases(&self.inner) {
dict.set_item("ref_base", ref_base)?;
dict.set_item("alt_base", alt_base)?;
}
if let Some(indel_len) = get_indel_length(&self.inner) {
dict.set_item("indel_length", indel_len)?;
}
dict.set_item("num_variants", get_num_variants(&self.inner))?;
Ok(dict)
}
fn to_json(&self) -> PyResult<String> {
serde_json::to_string(&self.inner)
.map_err(|e| PyRuntimeError::new_err(format!("JSON serialization failed: {}", e)))
}
}
#[pyclass(name = "Normalizer")]
pub struct PyNormalizer {
provider: PyProvider,
config: NormalizeConfig,
}
#[pymethods]
impl PyNormalizer {
#[new]
#[pyo3(signature = (reference_json=None, direction="3prime"))]
fn new(reference_json: Option<&str>, direction: &str) -> PyResult<Self> {
let provider = match reference_json {
Some(path) => PyProvider::from_json(Path::new(path))?,
None => PyProvider::test_data(),
};
let config = NormalizeConfig::default().with_direction(parse_direction(direction));
Ok(Self { provider, config })
}
#[staticmethod]
#[pyo3(signature = (manifest_path, direction="3prime"))]
fn from_manifest(manifest_path: &str, direction: &str) -> PyResult<Self> {
let provider = PyProvider::from_manifest(Path::new(manifest_path))?;
let config = NormalizeConfig::default().with_direction(parse_direction(direction));
Ok(Self { provider, config })
}
fn parse(&self, hgvs_string: &str) -> PyResult<PyHgvsVariant> {
parse(hgvs_string)
}
fn normalize_variant(&self, variant: &PyHgvsVariant) -> PyResult<PyHgvsVariant> {
let normalizer = Normalizer::with_config(self.provider.clone(), self.config.clone());
let normalized = normalizer
.normalize(&variant.inner)
.map_err(|e| PyRuntimeError::new_err(format!("Normalization error: {}", e)))?;
Ok(PyHgvsVariant { inner: normalized })
}
fn normalize(&self, hgvs_string: &str) -> PyResult<String> {
let variant = parse_hgvs(hgvs_string)
.map_err(|e| PyValueError::new_err(format!("Parse error: {}", e)))?;
let normalizer = Normalizer::with_config(self.provider.clone(), self.config.clone());
let normalized = normalizer
.normalize(&variant)
.map_err(|e| PyRuntimeError::new_err(format!("Normalization error: {}", e)))?;
Ok(normalized.to_string())
}
}
#[pyfunction]
fn parse_spdi(spdi_string: &str) -> PyResult<PySpdiVariant> {
match rust_parse_spdi(spdi_string) {
Ok(spdi) => Ok(PySpdiVariant { inner: spdi }),
Err(e) => Err(PyValueError::new_err(format!("Parse error: {}", e))),
}
}
#[pyfunction]
fn hgvs_to_spdi(variant: &PyHgvsVariant) -> PyResult<PySpdiVariant> {
match hgvs_to_spdi_simple(&variant.inner) {
Ok(spdi) => Ok(PySpdiVariant { inner: spdi }),
Err(e) => Err(PyValueError::new_err(format!("Conversion error: {}", e))),
}
}
#[pyfunction]
fn spdi_to_hgvs_variant(spdi: &PySpdiVariant) -> PyResult<PyHgvsVariant> {
match spdi_to_hgvs(&spdi.inner) {
Ok(variant) => Ok(PyHgvsVariant { inner: variant }),
Err(e) => Err(PyValueError::new_err(format!("Conversion error: {}", e))),
}
}
#[pyclass(name = "SpdiVariant", from_py_object)]
#[derive(Clone)]
pub struct PySpdiVariant {
inner: SpdiVariant,
}
#[pymethods]
impl PySpdiVariant {
#[new]
fn new(sequence: &str, position: u64, deletion: &str, insertion: &str) -> Self {
Self {
inner: SpdiVariant::new(sequence, position, deletion, insertion),
}
}
fn __str__(&self) -> String {
self.inner.to_string()
}
fn __repr__(&self) -> String {
format!("SpdiVariant('{}')", self.inner)
}
fn __eq__(&self, other: &Self) -> bool {
self.inner == other.inner
}
fn __hash__(&self) -> u64 {
let mut hasher = DefaultHasher::new();
self.inner.hash(&mut hasher);
hasher.finish()
}
#[getter]
fn sequence(&self) -> &str {
&self.inner.sequence
}
#[getter]
fn position(&self) -> u64 {
self.inner.position
}
#[getter]
fn deletion(&self) -> &str {
&self.inner.deletion
}
#[getter]
fn insertion(&self) -> &str {
&self.inner.insertion
}
fn is_substitution(&self) -> bool {
self.inner.is_substitution()
}
fn is_snv(&self) -> bool {
self.inner.is_substitution() && self.inner.deletion.len() == 1
}
fn is_deletion(&self) -> bool {
self.inner.is_deletion()
}
fn is_insertion(&self) -> bool {
self.inner.is_insertion()
}
fn is_delins(&self) -> bool {
self.inner.is_delins()
}
fn is_identity(&self) -> bool {
self.inner.is_identity()
}
fn variant_type(&self) -> &'static str {
self.inner.variant_type()
}
fn to_one_based_position(&self) -> u64 {
self.inner.to_one_based_position()
}
fn to_dict<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
let dict = PyDict::new(py);
dict.set_item("sequence", &self.inner.sequence)?;
dict.set_item("position", self.inner.position)?;
dict.set_item("deletion", &self.inner.deletion)?;
dict.set_item("insertion", &self.inner.insertion)?;
dict.set_item("variant_type", self.inner.variant_type())?;
Ok(dict)
}
}
#[pyclass(name = "ZeroBasedPos", from_py_object)]
#[derive(Clone)]
pub struct PyZeroBasedPos {
inner: ZeroBasedPos,
}
#[pymethods]
impl PyZeroBasedPos {
#[new]
fn new(pos: u64) -> Self {
Self {
inner: ZeroBasedPos::new(pos),
}
}
fn __str__(&self) -> String {
self.inner.to_string()
}
fn __repr__(&self) -> String {
format!("ZeroBasedPos({})", self.inner.value())
}
fn __eq__(&self, other: &Self) -> bool {
self.inner == other.inner
}
fn __hash__(&self) -> u64 {
let mut hasher = DefaultHasher::new();
self.inner.hash(&mut hasher);
hasher.finish()
}
fn __lt__(&self, other: &Self) -> bool {
self.inner < other.inner
}
fn __le__(&self, other: &Self) -> bool {
self.inner <= other.inner
}
fn __gt__(&self, other: &Self) -> bool {
self.inner > other.inner
}
fn __ge__(&self, other: &Self) -> bool {
self.inner >= other.inner
}
#[getter]
fn value(&self) -> u64 {
self.inner.value()
}
fn to_one_based(&self) -> PyOneBasedPos {
PyOneBasedPos {
inner: self.inner.to_one_based(),
}
}
fn as_index(&self) -> usize {
self.inner.as_index()
}
}
#[pyclass(name = "OneBasedPos", from_py_object)]
#[derive(Clone)]
pub struct PyOneBasedPos {
inner: OneBasedPos,
}
#[pymethods]
impl PyOneBasedPos {
#[new]
fn new(pos: u64) -> PyResult<Self> {
if pos == 0 {
return Err(PyValueError::new_err("1-based position cannot be 0"));
}
Ok(Self {
inner: OneBasedPos::new(pos),
})
}
fn __str__(&self) -> String {
self.inner.to_string()
}
fn __repr__(&self) -> String {
format!("OneBasedPos({})", self.inner.value())
}
fn __eq__(&self, other: &Self) -> bool {
self.inner == other.inner
}
fn __hash__(&self) -> u64 {
let mut hasher = DefaultHasher::new();
self.inner.hash(&mut hasher);
hasher.finish()
}
fn __lt__(&self, other: &Self) -> bool {
self.inner < other.inner
}
fn __le__(&self, other: &Self) -> bool {
self.inner <= other.inner
}
fn __gt__(&self, other: &Self) -> bool {
self.inner > other.inner
}
fn __ge__(&self, other: &Self) -> bool {
self.inner >= other.inner
}
#[getter]
fn value(&self) -> u64 {
self.inner.value()
}
fn to_zero_based(&self) -> PyZeroBasedPos {
PyZeroBasedPos {
inner: self.inner.to_zero_based(),
}
}
}
#[pyfunction]
fn hgvs_pos_to_index(pos: u64) -> usize {
crate::coords::hgvs_pos_to_index(pos)
}
#[pyfunction]
fn index_to_hgvs_pos(idx: usize) -> u64 {
crate::coords::index_to_hgvs_pos(idx)
}
#[pyclass(name = "EquivalenceLevel", eq, eq_int, from_py_object)]
#[derive(Clone, Copy, PartialEq, Eq)]
pub enum PyEquivalenceLevel {
Identical = 0,
NormalizedMatch = 1,
AccessionVersionDifference = 2,
NotEquivalent = 3,
}
impl From<EquivalenceLevel> for PyEquivalenceLevel {
fn from(level: EquivalenceLevel) -> Self {
match level {
EquivalenceLevel::Identical => PyEquivalenceLevel::Identical,
EquivalenceLevel::NormalizedMatch => PyEquivalenceLevel::NormalizedMatch,
EquivalenceLevel::AccessionVersionDifference => {
PyEquivalenceLevel::AccessionVersionDifference
}
EquivalenceLevel::NotEquivalent => PyEquivalenceLevel::NotEquivalent,
}
}
}
#[pymethods]
impl PyEquivalenceLevel {
fn __str__(&self) -> &'static str {
match self {
PyEquivalenceLevel::Identical => "Identical",
PyEquivalenceLevel::NormalizedMatch => "NormalizedMatch",
PyEquivalenceLevel::AccessionVersionDifference => "AccessionVersionDifference",
PyEquivalenceLevel::NotEquivalent => "NotEquivalent",
}
}
fn is_equivalent(&self) -> bool {
!matches!(self, PyEquivalenceLevel::NotEquivalent)
}
fn description(&self) -> &'static str {
match self {
PyEquivalenceLevel::Identical => "Identical representation",
PyEquivalenceLevel::NormalizedMatch => "Equivalent after normalization",
PyEquivalenceLevel::AccessionVersionDifference => {
"Same variant, different accession versions"
}
PyEquivalenceLevel::NotEquivalent => "Not equivalent",
}
}
}
#[pyclass(name = "EquivalenceResult", from_py_object)]
#[derive(Clone)]
pub struct PyEquivalenceResult {
#[pyo3(get)]
pub level: PyEquivalenceLevel,
#[pyo3(get)]
pub normalized_first: Option<String>,
#[pyo3(get)]
pub normalized_second: Option<String>,
#[pyo3(get)]
pub notes: Vec<String>,
}
#[pymethods]
impl PyEquivalenceResult {
fn __repr__(&self) -> String {
format!(
"EquivalenceResult(level={}, is_equivalent={})",
self.level.__str__(),
self.is_equivalent()
)
}
fn is_equivalent(&self) -> bool {
self.level.is_equivalent()
}
}
impl From<EquivalenceResult> for PyEquivalenceResult {
fn from(result: EquivalenceResult) -> Self {
Self {
level: result.level.into(),
normalized_first: result.normalized_first,
normalized_second: result.normalized_second,
notes: result.notes,
}
}
}
#[pyclass(name = "EquivalenceChecker")]
pub struct PyEquivalenceChecker {
checker: EquivalenceChecker<PyProvider>,
}
#[pymethods]
impl PyEquivalenceChecker {
#[new]
#[pyo3(signature = (reference_json=None))]
fn new(reference_json: Option<&str>) -> PyResult<Self> {
let provider = match reference_json {
Some(path) => PyProvider::from_json(Path::new(path))?,
None => PyProvider::test_data(),
};
Ok(Self {
checker: EquivalenceChecker::new(provider),
})
}
#[staticmethod]
fn from_manifest(manifest_path: &str) -> PyResult<Self> {
let provider = PyProvider::from_manifest(Path::new(manifest_path))?;
Ok(Self {
checker: EquivalenceChecker::new(provider),
})
}
fn check(&self, v1: &PyHgvsVariant, v2: &PyHgvsVariant) -> PyResult<PyEquivalenceResult> {
self.checker
.check(&v1.inner, &v2.inner)
.map(|r| r.into())
.map_err(|e| PyRuntimeError::new_err(format!("Equivalence check failed: {}", e)))
}
fn all_equivalent(&self, variants: Vec<PyRef<PyHgvsVariant>>) -> PyResult<bool> {
let rust_variants: Vec<HgvsVariant> = variants.iter().map(|v| v.inner.clone()).collect();
self.checker
.all_equivalent(&rust_variants)
.map_err(|e| PyRuntimeError::new_err(format!("Equivalence check failed: {}", e)))
}
}
#[pyclass(name = "Consequence", eq, eq_int, from_py_object)]
#[derive(Clone, Copy, PartialEq, Eq)]
pub enum PyConsequence {
TranscriptAblation = 0,
SpliceAcceptorVariant = 1,
SpliceDonorVariant = 2,
StopGained = 3,
FrameshiftVariant = 4,
StopLost = 5,
StartLost = 6,
MissenseVariant = 7,
InframeInsertion = 8,
InframeDeletion = 9,
ProteinAlteringVariant = 10,
SpliceRegionVariant = 11,
SynonymousVariant = 12,
StartRetainedVariant = 13,
StopRetainedVariant = 14,
FivePrimeUtrVariant = 15,
ThreePrimeUtrVariant = 16,
IntronVariant = 17,
CodingSequenceVariant = 18,
}
impl From<Consequence> for PyConsequence {
fn from(c: Consequence) -> Self {
match c {
Consequence::TranscriptAblation => PyConsequence::TranscriptAblation,
Consequence::SpliceAcceptorVariant => PyConsequence::SpliceAcceptorVariant,
Consequence::SpliceDonorVariant => PyConsequence::SpliceDonorVariant,
Consequence::StopGained => PyConsequence::StopGained,
Consequence::FrameshiftVariant => PyConsequence::FrameshiftVariant,
Consequence::StopLost => PyConsequence::StopLost,
Consequence::StartLost => PyConsequence::StartLost,
Consequence::MissenseVariant => PyConsequence::MissenseVariant,
Consequence::InframeInsertion => PyConsequence::InframeInsertion,
Consequence::InframeDeletion => PyConsequence::InframeDeletion,
Consequence::ProteinAlteringVariant => PyConsequence::ProteinAlteringVariant,
Consequence::SpliceRegionVariant => PyConsequence::SpliceRegionVariant,
Consequence::SynonymousVariant => PyConsequence::SynonymousVariant,
Consequence::StartRetainedVariant => PyConsequence::StartRetainedVariant,
Consequence::StopRetainedVariant => PyConsequence::StopRetainedVariant,
Consequence::FivePrimeUtrVariant => PyConsequence::FivePrimeUtrVariant,
Consequence::ThreePrimeUtrVariant => PyConsequence::ThreePrimeUtrVariant,
Consequence::IntronVariant => PyConsequence::IntronVariant,
Consequence::CodingSequenceVariant => PyConsequence::CodingSequenceVariant,
}
}
}
impl From<PyConsequence> for Consequence {
fn from(c: PyConsequence) -> Self {
match c {
PyConsequence::TranscriptAblation => Consequence::TranscriptAblation,
PyConsequence::SpliceAcceptorVariant => Consequence::SpliceAcceptorVariant,
PyConsequence::SpliceDonorVariant => Consequence::SpliceDonorVariant,
PyConsequence::StopGained => Consequence::StopGained,
PyConsequence::FrameshiftVariant => Consequence::FrameshiftVariant,
PyConsequence::StopLost => Consequence::StopLost,
PyConsequence::StartLost => Consequence::StartLost,
PyConsequence::MissenseVariant => Consequence::MissenseVariant,
PyConsequence::InframeInsertion => Consequence::InframeInsertion,
PyConsequence::InframeDeletion => Consequence::InframeDeletion,
PyConsequence::ProteinAlteringVariant => Consequence::ProteinAlteringVariant,
PyConsequence::SpliceRegionVariant => Consequence::SpliceRegionVariant,
PyConsequence::SynonymousVariant => Consequence::SynonymousVariant,
PyConsequence::StartRetainedVariant => Consequence::StartRetainedVariant,
PyConsequence::StopRetainedVariant => Consequence::StopRetainedVariant,
PyConsequence::FivePrimeUtrVariant => Consequence::FivePrimeUtrVariant,
PyConsequence::ThreePrimeUtrVariant => Consequence::ThreePrimeUtrVariant,
PyConsequence::IntronVariant => Consequence::IntronVariant,
PyConsequence::CodingSequenceVariant => Consequence::CodingSequenceVariant,
}
}
}
#[pymethods]
impl PyConsequence {
fn __str__(&self) -> &'static str {
Consequence::from(*self).so_term()
}
fn so_term(&self) -> &'static str {
Consequence::from(*self).so_term()
}
fn so_id(&self) -> &'static str {
Consequence::from(*self).so_id()
}
fn impact(&self) -> PyImpact {
Consequence::from(*self).impact().into()
}
fn description(&self) -> &'static str {
Consequence::from(*self).description()
}
}
#[pyclass(name = "Impact", eq, eq_int, from_py_object)]
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum PyImpact {
Modifier = 0,
Low = 1,
Moderate = 2,
High = 3,
}
impl From<Impact> for PyImpact {
fn from(i: Impact) -> Self {
match i {
Impact::Modifier => PyImpact::Modifier,
Impact::Low => PyImpact::Low,
Impact::Moderate => PyImpact::Moderate,
Impact::High => PyImpact::High,
}
}
}
#[pymethods]
impl PyImpact {
fn __str__(&self) -> &'static str {
match self {
PyImpact::High => "HIGH",
PyImpact::Moderate => "MODERATE",
PyImpact::Low => "LOW",
PyImpact::Modifier => "MODIFIER",
}
}
}
#[pyclass(name = "ProteinEffect", from_py_object)]
#[derive(Clone)]
pub struct PyProteinEffect {
inner: ProteinEffect,
}
#[pymethods]
impl PyProteinEffect {
#[getter]
fn consequences(&self) -> Vec<PyConsequence> {
self.inner
.consequences
.iter()
.map(|c| (*c).into())
.collect()
}
#[getter]
fn impact(&self) -> PyImpact {
self.inner.impact.into()
}
fn is_high_impact(&self) -> bool {
self.inner.is_high_impact()
}
fn is_protein_altering(&self) -> bool {
self.inner.is_protein_altering()
}
fn __repr__(&self) -> String {
format!(
"ProteinEffect(impact={}, consequences={:?})",
self.inner.impact,
self.inner
.consequences
.iter()
.map(|c| c.so_term())
.collect::<Vec<_>>()
)
}
}
#[pyclass(name = "EffectPredictor")]
pub struct PyEffectPredictor {
inner: EffectPredictor,
}
#[pymethods]
impl PyEffectPredictor {
#[new]
fn new() -> Self {
Self {
inner: EffectPredictor::new(),
}
}
fn classify_amino_acid_change(
&self,
ref_aa: &str,
alt_aa: &str,
position: u64,
) -> PyResult<PyProteinEffect> {
let ref_aa = parse_amino_acid(ref_aa)?;
let alt_aa = parse_amino_acid(alt_aa)?;
Ok(PyProteinEffect {
inner: self
.inner
.classify_amino_acid_change(&ref_aa, &alt_aa, position),
})
}
fn classify_indel(&self, ref_len: usize, alt_len: usize) -> PyProteinEffect {
PyProteinEffect {
inner: self.inner.classify_indel(ref_len, alt_len),
}
}
fn classify_splice_variant(&self, offset: i64) -> PyProteinEffect {
PyProteinEffect {
inner: self.inner.classify_splice_variant(offset),
}
}
fn classify_utr_variant(&self, is_5_prime: bool) -> PyProteinEffect {
PyProteinEffect {
inner: self.inner.classify_utr_variant(is_5_prime),
}
}
}
fn parse_amino_acid(code: &str) -> PyResult<AminoAcid> {
match code.to_uppercase().as_str() {
"ALA" | "A" => Ok(AminoAcid::Ala),
"ARG" | "R" => Ok(AminoAcid::Arg),
"ASN" | "N" => Ok(AminoAcid::Asn),
"ASP" | "D" => Ok(AminoAcid::Asp),
"CYS" | "C" => Ok(AminoAcid::Cys),
"GLN" | "Q" => Ok(AminoAcid::Gln),
"GLU" | "E" => Ok(AminoAcid::Glu),
"GLY" | "G" => Ok(AminoAcid::Gly),
"HIS" | "H" => Ok(AminoAcid::His),
"ILE" | "I" => Ok(AminoAcid::Ile),
"LEU" | "L" => Ok(AminoAcid::Leu),
"LYS" | "K" => Ok(AminoAcid::Lys),
"MET" | "M" => Ok(AminoAcid::Met),
"PHE" | "F" => Ok(AminoAcid::Phe),
"PRO" | "P" => Ok(AminoAcid::Pro),
"SER" | "S" => Ok(AminoAcid::Ser),
"THR" | "T" => Ok(AminoAcid::Thr),
"TRP" | "W" => Ok(AminoAcid::Trp),
"TYR" | "Y" => Ok(AminoAcid::Tyr),
"VAL" | "V" => Ok(AminoAcid::Val),
"TER" | "*" | "X" => Ok(AminoAcid::Ter),
"SEC" | "U" => Ok(AminoAcid::Sec),
_ => Err(PyValueError::new_err(format!(
"Unknown amino acid: {}",
code
))),
}
}
#[pyclass(name = "MaveContext", from_py_object)]
#[derive(Clone)]
pub struct PyMaveContext {
inner: MaveContext,
}
#[pymethods]
impl PyMaveContext {
#[new]
fn new() -> Self {
Self {
inner: MaveContext::new(),
}
}
fn with_protein_accession(&self, accession: &str) -> Self {
Self {
inner: self.inner.clone().with_protein_accession(accession),
}
}
fn with_coding_accession(&self, accession: &str) -> Self {
Self {
inner: self.inner.clone().with_coding_accession(accession),
}
}
fn with_noncoding_accession(&self, accession: &str) -> Self {
Self {
inner: self.inner.clone().with_noncoding_accession(accession),
}
}
fn with_genomic_accession(&self, accession: &str) -> Self {
Self {
inner: self.inner.clone().with_genomic_accession(accession),
}
}
fn with_gene_symbol(&self, symbol: &str) -> Self {
Self {
inner: self.inner.clone().with_gene_symbol(symbol),
}
}
fn has_accessions(&self) -> bool {
self.inner.has_accessions()
}
fn supports_coordinate_type(&self, coord_type: &str) -> PyResult<bool> {
let c = coord_type
.chars()
.next()
.ok_or_else(|| PyValueError::new_err("coord_type must be a non-empty string"))?;
Ok(self.inner.supports_coordinate_type(c))
}
#[getter]
fn noncoding_accession(&self) -> Option<String> {
self.inner.noncoding_accession.clone()
}
#[getter]
fn protein_accession(&self) -> Option<String> {
self.inner.protein_accession.clone()
}
#[getter]
fn coding_accession(&self) -> Option<String> {
self.inner.coding_accession.clone()
}
#[getter]
fn genomic_accession(&self) -> Option<String> {
self.inner.genomic_accession.clone()
}
#[getter]
fn gene_symbol(&self) -> Option<String> {
self.inner.gene_symbol.clone()
}
}
#[pyfunction]
fn parse_mave_hgvs_variant(hgvs_string: &str, context: &PyMaveContext) -> PyResult<PyHgvsVariant> {
match parse_mave_hgvs(hgvs_string, &context.inner) {
Ok(variant) => Ok(PyHgvsVariant { inner: variant }),
Err(e) => Err(PyValueError::new_err(format!("Parse error: {}", e))),
}
}
#[pyfunction]
fn is_mave_short_form_variant(hgvs_string: &str) -> bool {
is_mave_short_form(hgvs_string)
}
#[pyclass(name = "BatchProgress", from_py_object)]
#[derive(Clone)]
pub struct PyBatchProgress {
#[pyo3(get)]
pub processed: usize,
#[pyo3(get)]
pub total: usize,
#[pyo3(get)]
pub successes: usize,
#[pyo3(get)]
pub failures: usize,
}
#[pymethods]
impl PyBatchProgress {
fn percent(&self) -> f64 {
if self.total == 0 {
100.0
} else {
(self.processed as f64 / self.total as f64) * 100.0
}
}
}
impl From<&BatchProgress> for PyBatchProgress {
fn from(p: &BatchProgress) -> Self {
Self {
processed: p.processed,
total: p.total,
successes: p.success,
failures: p.errors,
}
}
}
#[pyclass(name = "BatchResult")]
pub struct PyBatchResult {
inner: BatchResult<HgvsVariant>,
}
#[pymethods]
impl PyBatchResult {
fn total(&self) -> usize {
self.inner.total()
}
fn success_count(&self) -> usize {
self.inner.success_count()
}
fn error_count(&self) -> usize {
self.inner.error_count()
}
fn success_rate(&self) -> f64 {
self.inner.success_rate()
}
fn successes(&self) -> Vec<PyHgvsVariant> {
self.inner
.results
.iter()
.filter_map(|r| match r {
crate::batch::ItemResult::Ok(v) => Some(PyHgvsVariant { inner: v.clone() }),
_ => None,
})
.collect()
}
fn errors(&self) -> Vec<(usize, String)> {
self.inner
.results
.iter()
.enumerate()
.filter_map(|(i, r)| match r {
crate::batch::ItemResult::Err { error, .. } => Some((i, error.to_string())),
_ => None,
})
.collect()
}
fn __repr__(&self) -> String {
format!(
"BatchResult(total={}, successes={}, errors={})",
self.total(),
self.success_count(),
self.error_count()
)
}
}
#[pyclass(name = "BatchProcessor")]
pub struct PyBatchProcessor {
processor: BatchProcessor<PyProvider>,
}
#[pymethods]
impl PyBatchProcessor {
#[new]
#[pyo3(signature = (reference_json=None))]
fn new(reference_json: Option<&str>) -> PyResult<Self> {
let provider = match reference_json {
Some(path) => PyProvider::from_json(Path::new(path))?,
None => PyProvider::test_data(),
};
Ok(Self {
processor: BatchProcessor::new(provider),
})
}
#[staticmethod]
fn from_manifest(manifest_path: &str) -> PyResult<Self> {
let provider = PyProvider::from_manifest(Path::new(manifest_path))?;
Ok(Self {
processor: BatchProcessor::new(provider),
})
}
fn parse(&self, variants: Vec<String>) -> PyBatchResult {
PyBatchResult {
inner: self.processor.parse(&variants),
}
}
fn parse_and_normalize(&self, variants: Vec<String>) -> PyBatchResult {
PyBatchResult {
inner: self.processor.parse_and_normalize(&variants),
}
}
fn parse_with_progress(
&self,
variants: Vec<String>,
callback: Bound<'_, pyo3::PyAny>,
) -> PyResult<PyBatchResult> {
let result = self.processor.parse_with_progress(&variants, |progress| {
let py_progress = PyBatchProgress::from(&progress);
let _ = callback.call1((py_progress,));
});
Ok(PyBatchResult { inner: result })
}
}
#[pyclass(name = "ErrorMode", eq, eq_int, from_py_object)]
#[derive(Clone, Copy, PartialEq, Eq)]
pub enum PyErrorMode {
Strict = 0,
Lenient = 1,
Silent = 2,
}
impl From<ErrorMode> for PyErrorMode {
fn from(mode: ErrorMode) -> Self {
match mode {
ErrorMode::Strict => PyErrorMode::Strict,
ErrorMode::Lenient => PyErrorMode::Lenient,
ErrorMode::Silent => PyErrorMode::Silent,
}
}
}
impl From<PyErrorMode> for ErrorMode {
fn from(mode: PyErrorMode) -> Self {
match mode {
PyErrorMode::Strict => ErrorMode::Strict,
PyErrorMode::Lenient => ErrorMode::Lenient,
PyErrorMode::Silent => ErrorMode::Silent,
}
}
}
#[pyclass(name = "ErrorType", eq, eq_int, from_py_object)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum PyErrorType {
LowercaseAminoAcid = 0,
MissingVersion = 1,
WrongDashCharacter = 2,
ExtraWhitespace = 3,
ProteinSubstitutionArrow = 4,
PositionZero = 5,
SingleLetterAminoAcid = 6,
WrongQuoteCharacter = 7,
LowercaseAccessionPrefix = 8,
MixedCaseEditType = 9,
OldSubstitutionSyntax = 10,
InvalidUnicodeCharacter = 11,
SwappedPositions = 12,
TrailingAnnotation = 13,
MissingCoordinatePrefix = 14,
OldAlleleFormat = 15,
RefSeqMismatch = 16,
}
impl From<ErrorType> for PyErrorType {
fn from(t: ErrorType) -> Self {
match t {
ErrorType::LowercaseAminoAcid => PyErrorType::LowercaseAminoAcid,
ErrorType::MissingVersion => PyErrorType::MissingVersion,
ErrorType::WrongDashCharacter => PyErrorType::WrongDashCharacter,
ErrorType::ExtraWhitespace => PyErrorType::ExtraWhitespace,
ErrorType::ProteinSubstitutionArrow => PyErrorType::ProteinSubstitutionArrow,
ErrorType::PositionZero => PyErrorType::PositionZero,
ErrorType::SingleLetterAminoAcid => PyErrorType::SingleLetterAminoAcid,
ErrorType::WrongQuoteCharacter => PyErrorType::WrongQuoteCharacter,
ErrorType::LowercaseAccessionPrefix => PyErrorType::LowercaseAccessionPrefix,
ErrorType::MixedCaseEditType => PyErrorType::MixedCaseEditType,
ErrorType::OldSubstitutionSyntax => PyErrorType::OldSubstitutionSyntax,
ErrorType::InvalidUnicodeCharacter => PyErrorType::InvalidUnicodeCharacter,
ErrorType::SwappedPositions => PyErrorType::SwappedPositions,
ErrorType::TrailingAnnotation => PyErrorType::TrailingAnnotation,
ErrorType::MissingCoordinatePrefix => PyErrorType::MissingCoordinatePrefix,
ErrorType::OldAlleleFormat => PyErrorType::OldAlleleFormat,
ErrorType::RefSeqMismatch => PyErrorType::RefSeqMismatch,
}
}
}
impl From<PyErrorType> for ErrorType {
fn from(t: PyErrorType) -> Self {
match t {
PyErrorType::LowercaseAminoAcid => ErrorType::LowercaseAminoAcid,
PyErrorType::MissingVersion => ErrorType::MissingVersion,
PyErrorType::WrongDashCharacter => ErrorType::WrongDashCharacter,
PyErrorType::ExtraWhitespace => ErrorType::ExtraWhitespace,
PyErrorType::ProteinSubstitutionArrow => ErrorType::ProteinSubstitutionArrow,
PyErrorType::PositionZero => ErrorType::PositionZero,
PyErrorType::SingleLetterAminoAcid => ErrorType::SingleLetterAminoAcid,
PyErrorType::WrongQuoteCharacter => ErrorType::WrongQuoteCharacter,
PyErrorType::LowercaseAccessionPrefix => ErrorType::LowercaseAccessionPrefix,
PyErrorType::MixedCaseEditType => ErrorType::MixedCaseEditType,
PyErrorType::OldSubstitutionSyntax => ErrorType::OldSubstitutionSyntax,
PyErrorType::InvalidUnicodeCharacter => ErrorType::InvalidUnicodeCharacter,
PyErrorType::SwappedPositions => ErrorType::SwappedPositions,
PyErrorType::TrailingAnnotation => ErrorType::TrailingAnnotation,
PyErrorType::MissingCoordinatePrefix => ErrorType::MissingCoordinatePrefix,
PyErrorType::OldAlleleFormat => ErrorType::OldAlleleFormat,
PyErrorType::RefSeqMismatch => ErrorType::RefSeqMismatch,
}
}
}
#[pyclass(name = "ErrorOverride", eq, eq_int, from_py_object)]
#[derive(Clone, Copy, PartialEq, Eq)]
pub enum PyErrorOverride {
Default = 0,
Reject = 1,
WarnCorrect = 2,
SilentCorrect = 3,
Accept = 4,
}
impl From<ErrorOverride> for PyErrorOverride {
fn from(o: ErrorOverride) -> Self {
match o {
ErrorOverride::Default => PyErrorOverride::Default,
ErrorOverride::Reject => PyErrorOverride::Reject,
ErrorOverride::WarnCorrect => PyErrorOverride::WarnCorrect,
ErrorOverride::SilentCorrect => PyErrorOverride::SilentCorrect,
ErrorOverride::Accept => PyErrorOverride::Accept,
}
}
}
impl From<PyErrorOverride> for ErrorOverride {
fn from(o: PyErrorOverride) -> Self {
match o {
PyErrorOverride::Default => ErrorOverride::Default,
PyErrorOverride::Reject => ErrorOverride::Reject,
PyErrorOverride::WarnCorrect => ErrorOverride::WarnCorrect,
PyErrorOverride::SilentCorrect => ErrorOverride::SilentCorrect,
PyErrorOverride::Accept => ErrorOverride::Accept,
}
}
}
#[pyclass(name = "CorrectionWarning", from_py_object)]
#[derive(Clone)]
pub struct PyCorrectionWarning {
#[pyo3(get)]
pub error_type: PyErrorType,
#[pyo3(get)]
pub message: String,
#[pyo3(get)]
pub original: String,
#[pyo3(get)]
pub corrected: String,
}
impl From<&CorrectionWarning> for PyCorrectionWarning {
fn from(w: &CorrectionWarning) -> Self {
Self {
error_type: w.error_type.into(),
message: w.message.clone(),
original: w.original.clone(),
corrected: w.corrected.clone(),
}
}
}
#[pymethods]
impl PyCorrectionWarning {
fn __repr__(&self) -> String {
format!(
"CorrectionWarning({:?}: '{}' -> '{}')",
self.error_type, self.original, self.corrected
)
}
}
#[pyclass(name = "ErrorConfig", from_py_object)]
#[derive(Clone)]
pub struct PyErrorConfig {
inner: ErrorConfig,
}
#[pymethods]
impl PyErrorConfig {
#[staticmethod]
fn strict() -> Self {
Self {
inner: ErrorConfig::strict(),
}
}
#[staticmethod]
fn lenient() -> Self {
Self {
inner: ErrorConfig::lenient(),
}
}
#[staticmethod]
fn silent() -> Self {
Self {
inner: ErrorConfig::silent(),
}
}
fn with_override(&self, error_type: PyErrorType, override_: PyErrorOverride) -> Self {
Self {
inner: self
.inner
.clone()
.with_override(error_type.into(), override_.into()),
}
}
#[getter]
fn mode(&self) -> PyErrorMode {
self.inner.mode.into()
}
fn should_reject(&self, error_type: PyErrorType) -> bool {
self.inner.should_reject(error_type.into())
}
fn should_correct(&self, error_type: PyErrorType) -> bool {
self.inner.should_correct(error_type.into())
}
fn should_warn(&self, error_type: PyErrorType) -> bool {
self.inner.should_warn(error_type.into())
}
}
#[pyclass(name = "ParseResultWithWarnings")]
pub struct PyParseResultWithWarnings {
#[pyo3(get)]
pub variant: PyHgvsVariant,
#[pyo3(get)]
pub warnings: Vec<PyCorrectionWarning>,
#[pyo3(get)]
pub original_input: String,
#[pyo3(get)]
pub preprocessed_input: String,
}
#[pymethods]
impl PyParseResultWithWarnings {
fn had_corrections(&self) -> bool {
self.original_input != self.preprocessed_input
}
fn has_warnings(&self) -> bool {
!self.warnings.is_empty()
}
}
#[pyfunction]
#[pyo3(signature = (hgvs_string, config=None))]
fn parse_lenient(
hgvs_string: &str,
config: Option<&PyErrorConfig>,
) -> PyResult<PyParseResultWithWarnings> {
let config = config
.map(|c| c.inner.clone())
.unwrap_or_else(ErrorConfig::lenient);
let preprocessor = config.preprocessor();
let preprocess_result = preprocessor.preprocess(hgvs_string);
if !preprocess_result.success {
return Err(PyValueError::new_err(format!(
"Preprocessing failed: {:?}",
preprocess_result.warnings
)));
}
let variant = parse_hgvs(&preprocess_result.preprocessed)
.map_err(|e| PyValueError::new_err(format!("Parse error: {}", e)))?;
Ok(PyParseResultWithWarnings {
variant: PyHgvsVariant { inner: variant },
warnings: preprocess_result
.warnings
.iter()
.map(|w| w.into())
.collect(),
original_input: hgvs_string.to_string(),
preprocessed_input: preprocess_result.preprocessed,
})
}
#[pyclass(name = "CodonChange", from_py_object)]
#[derive(Clone)]
pub struct PyCodonChange {
inner: CodonChange,
}
#[pymethods]
impl PyCodonChange {
#[getter]
fn ref_codon(&self) -> String {
self.inner.ref_codon.to_string()
}
#[getter]
fn alt_codon(&self) -> String {
self.inner.alt_codon.to_string()
}
#[getter]
fn changed_positions(&self) -> Vec<u8> {
self.inner.changed_positions.clone()
}
fn num_changes(&self) -> usize {
self.inner.nucleotide_changes.len()
}
fn __str__(&self) -> String {
self.inner.to_string()
}
fn __repr__(&self) -> String {
format!(
"CodonChange({} -> {}, positions={:?})",
self.inner.ref_codon, self.inner.alt_codon, self.inner.changed_positions
)
}
}
#[pyclass(name = "CodonTable", from_py_object)]
#[derive(Clone)]
pub struct PyCodonTable {
inner: CodonTable,
}
#[pymethods]
impl PyCodonTable {
#[staticmethod]
fn standard() -> Self {
Self {
inner: CodonTable::standard(),
}
}
}
#[pyclass(name = "Backtranslator")]
pub struct PyBacktranslator {
inner: Backtranslator,
}
#[pymethods]
impl PyBacktranslator {
#[new]
#[pyo3(signature = (codon_table=None))]
fn new(codon_table: Option<&PyCodonTable>) -> Self {
let table = codon_table
.map(|t| t.inner.clone())
.unwrap_or_else(CodonTable::standard);
Self {
inner: Backtranslator::new(table),
}
}
#[staticmethod]
fn standard() -> Self {
Self {
inner: Backtranslator::standard(),
}
}
fn backtranslate_substitution(
&self,
ref_aa: &str,
alt_aa: &str,
) -> PyResult<Vec<PyCodonChange>> {
let ref_aa = parse_amino_acid(ref_aa)?;
let alt_aa = parse_amino_acid(alt_aa)?;
Ok(self
.inner
.backtranslate_substitution(&ref_aa, &alt_aa)
.into_iter()
.map(|c| PyCodonChange { inner: c })
.collect())
}
fn backtranslate_to_stop(&self, ref_aa: &str) -> PyResult<Vec<PyCodonChange>> {
let ref_aa = parse_amino_acid(ref_aa)?;
Ok(self
.inner
.backtranslate_to_stop(&ref_aa)
.into_iter()
.map(|c| PyCodonChange { inner: c })
.collect())
}
fn backtranslate_stop_loss(&self, alt_aa: &str) -> PyResult<Vec<PyCodonChange>> {
let alt_aa = parse_amino_acid(alt_aa)?;
Ok(self
.inner
.backtranslate_stop_loss(&alt_aa)
.into_iter()
.map(|c| PyCodonChange { inner: c })
.collect())
}
}
#[pyfunction]
fn parse_rsid_value(rsid: &str) -> PyResult<u64> {
rust_parse_rsid(rsid).map_err(|e| PyValueError::new_err(format!("Invalid rsID: {}", e)))
}
#[pyfunction]
fn format_rsid_value(rsid_num: u64) -> String {
format_rsid(rsid_num)
}
#[pyclass(name = "RsIdResult", from_py_object)]
#[derive(Clone)]
pub struct PyRsIdResult {
inner: RsIdResult,
}
#[pymethods]
impl PyRsIdResult {
#[getter]
fn rsid(&self) -> &str {
&self.inner.rsid
}
#[getter]
fn contig(&self) -> &str {
&self.inner.contig
}
#[getter]
fn position(&self) -> u64 {
self.inner.position
}
#[getter]
fn reference(&self) -> &str {
&self.inner.reference
}
#[getter]
fn alternate(&self) -> &str {
&self.inner.alternate
}
#[getter]
fn hgvs(&self) -> Option<&str> {
self.inner.hgvs.as_deref()
}
#[getter]
fn allele_frequency(&self) -> Option<f64> {
self.inner.allele_frequency
}
#[getter]
fn clinical_significance(&self) -> Option<&str> {
self.inner.clinical_significance.as_deref()
}
fn is_snv(&self) -> bool {
self.inner.is_snv()
}
fn is_deletion(&self) -> bool {
self.inner.is_deletion()
}
fn is_insertion(&self) -> bool {
self.inner.is_insertion()
}
fn to_hgvs(&self) -> String {
self.inner.to_hgvs()
}
fn __repr__(&self) -> String {
format!(
"RsIdResult({}, {}:{}, {}>{})",
self.inner.rsid,
self.inner.contig,
self.inner.position,
self.inner.reference,
self.inner.alternate
)
}
}
#[pyclass(name = "InMemoryRsIdLookup")]
pub struct PyInMemoryRsIdLookup {
inner: InMemoryRsIdLookup,
}
#[pymethods]
impl PyInMemoryRsIdLookup {
#[staticmethod]
fn with_test_data() -> Self {
Self {
inner: InMemoryRsIdLookup::with_test_data(),
}
}
fn lookup(&self, rsid: &str) -> PyResult<Vec<PyRsIdResult>> {
use crate::rsid::RsIdLookup;
self.inner
.lookup(rsid)
.map(|results| {
results
.into_iter()
.map(|r| PyRsIdResult { inner: r })
.collect()
})
.map_err(|e| PyValueError::new_err(format!("Lookup failed: {}", e)))
}
fn contains(&self, rsid: &str) -> bool {
use crate::rsid::RsIdLookup;
self.inner.contains(rsid)
}
}
#[pyclass(name = "VcfRecord", from_py_object)]
#[derive(Clone)]
pub struct PyVcfRecord {
inner: VcfRecord,
}
#[pymethods]
impl PyVcfRecord {
#[new]
#[pyo3(signature = (chrom, pos, reference, alternate, id=None))]
fn new(chrom: &str, pos: u64, reference: &str, alternate: &str, id: Option<&str>) -> Self {
Self {
inner: VcfRecord {
chrom: chrom.to_string(),
pos,
id: id.map(|s| s.to_string()),
reference: reference.to_string(),
alternate: vec![alternate.to_string()],
quality: None,
filter: None,
info: std::collections::HashMap::new(),
format: None,
samples: Vec::new(),
genome_build: GenomeBuild::default(),
},
}
}
#[staticmethod]
fn snv(chrom: &str, pos: u64, ref_base: &str, alt_base: &str) -> PyResult<Self> {
let ref_char = ref_base
.chars()
.next()
.ok_or_else(|| PyValueError::new_err("ref_base must be a non-empty string"))?;
let alt_char = alt_base
.chars()
.next()
.ok_or_else(|| PyValueError::new_err("alt_base must be a non-empty string"))?;
Ok(Self {
inner: VcfRecord::snv(chrom, pos, ref_char, alt_char),
})
}
#[getter]
fn chrom(&self) -> &str {
&self.inner.chrom
}
#[getter]
fn pos(&self) -> u64 {
self.inner.pos
}
#[getter]
fn id(&self) -> Option<&str> {
self.inner.id.as_deref()
}
#[getter]
fn reference(&self) -> &str {
&self.inner.reference
}
#[getter]
fn alternate(&self) -> Option<&str> {
self.inner.alternate.first().map(|s| s.as_str())
}
#[getter]
fn alternates(&self) -> Vec<String> {
self.inner.alternate.clone()
}
fn __repr__(&self) -> String {
let alt_str = self.inner.alternate.join(",");
format!(
"VcfRecord({}:{} {}>{})",
self.inner.chrom, self.inner.pos, self.inner.reference, alt_str
)
}
}
#[pyfunction]
#[pyo3(signature = (record, alt_index=0))]
fn vcf_to_genomic_hgvs(record: &PyVcfRecord, alt_index: usize) -> PyResult<PyHgvsVariant> {
rust_vcf_to_hgvs(&record.inner, alt_index)
.map(|v| PyHgvsVariant {
inner: HgvsVariant::Genome(v),
})
.map_err(|e| PyValueError::new_err(format!("Conversion error: {}", e)))
}
#[pyclass(name = "PrepareConfig", from_py_object)]
#[derive(Clone)]
pub struct PyPrepareConfig {
inner: PrepareConfig,
}
#[pymethods]
impl PyPrepareConfig {
#[new]
#[pyo3(signature = (
output_dir="ferro-reference",
download_transcripts=true,
download_genome=false,
download_genome_grch37=false,
download_refseqgene=false,
download_lrg=false,
download_cdot=true,
skip_existing=true,
dry_run=false,
download_cdot_grch37=false
))]
#[allow(clippy::too_many_arguments)]
fn new(
output_dir: &str,
download_transcripts: bool,
download_genome: bool,
download_genome_grch37: bool,
download_refseqgene: bool,
download_lrg: bool,
download_cdot: bool,
skip_existing: bool,
dry_run: bool,
download_cdot_grch37: bool,
) -> Self {
Self {
inner: PrepareConfig {
output_dir: std::path::PathBuf::from(output_dir),
download_transcripts,
download_genome,
download_genome_grch37,
download_refseqgene,
download_lrg,
download_cdot,
download_cdot_grch37,
skip_existing,
clinvar_file: None,
patterns_file: None,
dry_run,
},
}
}
#[getter]
fn output_dir(&self) -> String {
self.inner.output_dir.display().to_string()
}
#[getter]
fn download_transcripts(&self) -> bool {
self.inner.download_transcripts
}
#[getter]
fn download_genome(&self) -> bool {
self.inner.download_genome
}
#[getter]
fn download_cdot(&self) -> bool {
self.inner.download_cdot
}
#[getter]
fn download_cdot_grch37(&self) -> bool {
self.inner.download_cdot_grch37
}
}
#[pyclass(name = "ReferenceManifest", from_py_object)]
#[derive(Clone)]
pub struct PyReferenceManifest {
inner: ReferenceManifest,
}
#[pymethods]
impl PyReferenceManifest {
#[getter]
fn prepared_at(&self) -> &str {
&self.inner.prepared_at
}
#[getter]
fn transcript_count(&self) -> usize {
self.inner.transcript_count
}
#[getter]
fn transcript_fastas(&self) -> Vec<String> {
self.inner
.transcript_fastas
.iter()
.map(|p| p.display().to_string())
.collect()
}
#[getter]
fn genome_fasta(&self) -> Option<String> {
self.inner
.genome_fasta
.as_ref()
.map(|p| p.display().to_string())
}
#[getter]
fn cdot_json(&self) -> Option<String> {
self.inner
.cdot_json
.as_ref()
.map(|p| p.display().to_string())
}
#[getter]
fn cdot_grch37_json(&self) -> Option<String> {
self.inner
.cdot_grch37_json
.as_ref()
.map(|p| p.display().to_string())
}
#[getter]
fn available_prefixes(&self) -> Vec<String> {
self.inner.available_prefixes.clone()
}
fn __repr__(&self) -> String {
format!(
"ReferenceManifest(transcripts={}, prefixes={:?})",
self.inner.transcript_count, self.inner.available_prefixes
)
}
}
#[pyfunction]
fn prepare_reference_data(config: &PyPrepareConfig) -> PyResult<PyReferenceManifest> {
prepare_references(&config.inner)
.map(|m| PyReferenceManifest { inner: m })
.map_err(|e| PyRuntimeError::new_err(format!("Prepare failed: {}", e)))
}
#[pyfunction]
fn check_reference_data(directory: &str) -> PyResult<PyReferenceManifest> {
check_references(Path::new(directory))
.map(|m| PyReferenceManifest { inner: m })
.map_err(|e| PyRuntimeError::new_err(format!("Check failed: {}", e)))
}
#[pyclass(name = "GenomeBuild", eq, eq_int, from_py_object)]
#[derive(Clone, Copy, PartialEq, Eq)]
pub enum PyGenomeBuild {
GRCh37 = 0,
GRCh38 = 1,
Unknown = 2,
}
impl From<GenomeBuild> for PyGenomeBuild {
fn from(b: GenomeBuild) -> Self {
match b {
GenomeBuild::GRCh37 => PyGenomeBuild::GRCh37,
GenomeBuild::GRCh38 => PyGenomeBuild::GRCh38,
GenomeBuild::Unknown => PyGenomeBuild::Unknown,
}
}
}
#[pymethods]
impl PyGenomeBuild {
fn __str__(&self) -> &'static str {
match self {
PyGenomeBuild::GRCh37 => "GRCh37",
PyGenomeBuild::GRCh38 => "GRCh38",
PyGenomeBuild::Unknown => "Unknown",
}
}
}
#[pyclass(name = "Strand", eq, eq_int, from_py_object)]
#[derive(Clone, Copy, PartialEq, Eq)]
pub enum PyStrand {
Plus = 0,
Minus = 1,
}
impl From<Strand> for PyStrand {
fn from(s: Strand) -> Self {
match s {
Strand::Plus => PyStrand::Plus,
Strand::Minus => PyStrand::Minus,
}
}
}
#[pymethods]
impl PyStrand {
fn __str__(&self) -> &'static str {
match self {
PyStrand::Plus => "+",
PyStrand::Minus => "-",
}
}
}
#[pyclass(name = "CoordinateMapper")]
pub struct PyCoordinateMapper {
provider: PyProvider,
}
#[pymethods]
impl PyCoordinateMapper {
#[new]
#[pyo3(signature = (reference_json=None))]
fn new(reference_json: Option<&str>) -> PyResult<Self> {
let provider = match reference_json {
Some(path) => PyProvider::from_json(Path::new(path))?,
None => PyProvider::test_data(),
};
Ok(Self { provider })
}
#[staticmethod]
fn from_manifest(manifest_path: &str) -> PyResult<Self> {
let provider = PyProvider::from_manifest(Path::new(manifest_path))?;
Ok(Self { provider })
}
#[pyo3(signature = (transcript_id, cds_position, offset=None))]
fn c_to_g(
&self,
transcript_id: &str,
cds_position: i64,
offset: Option<i64>,
) -> PyResult<Option<(String, u64)>> {
let transcript = self.provider.get_transcript(transcript_id).map_err(|e| {
PyValueError::new_err(format!("Transcript not found: {}: {}", transcript_id, e))
})?;
let mapper = CoordinateMapper::new(&transcript);
let effective_offset = offset.filter(|&o| o != 0);
let cds_pos = CdsPos {
base: cds_position,
offset: effective_offset,
utr3: false,
};
if effective_offset.is_some() {
match mapper.cds_to_genomic_with_intron(&cds_pos) {
Ok(genomic_pos) => {
let chrom = mapper
.chromosome()
.ok_or_else(|| PyValueError::new_err("Transcript has no chromosome"))?
.to_string();
Ok(Some((chrom, genomic_pos)))
}
Err(e) => Err(PyValueError::new_err(format!("Conversion error: {}", e))),
}
} else {
match mapper.cds_to_genomic(&cds_pos) {
Ok(Some(genomic_pos)) => {
let chrom = mapper
.chromosome()
.ok_or_else(|| PyValueError::new_err("Transcript has no chromosome"))?
.to_string();
Ok(Some((chrom, genomic_pos)))
}
Ok(None) => Ok(None),
Err(e) => Err(PyValueError::new_err(format!("Conversion error: {}", e))),
}
}
}
fn g_to_c(
&self,
transcript_id: &str,
genomic_position: u64,
) -> PyResult<(i64, Option<i64>, bool)> {
let transcript = self.provider.get_transcript(transcript_id).map_err(|e| {
PyValueError::new_err(format!("Transcript not found: {}: {}", transcript_id, e))
})?;
let mapper = CoordinateMapper::new(&transcript);
match mapper.genomic_to_cds(genomic_position) {
Ok(Some(cds_pos)) => Ok((cds_pos.base, cds_pos.offset, cds_pos.utr3)),
Ok(None) => {
mapper
.genomic_to_cds_intronic(genomic_position)
.map(|cds_pos| (cds_pos.base, cds_pos.offset, cds_pos.utr3))
.map_err(|e| {
PyValueError::new_err(format!(
"Position {} is outside transcript bounds or in an unsupported region: {}",
genomic_position, e
))
})
}
Err(e) => Err(PyValueError::new_err(format!("Conversion error: {}", e))),
}
}
fn c_to_p(&self, transcript_id: &str, cds_position: i64) -> PyResult<u64> {
let transcript = self.provider.get_transcript(transcript_id).map_err(|e| {
PyValueError::new_err(format!("Transcript not found: {}: {}", transcript_id, e))
})?;
let mapper = CoordinateMapper::new(&transcript);
let cds_pos = CdsPos::new(cds_position);
mapper
.cds_to_protein(&cds_pos)
.map(|prot_pos| prot_pos.number)
.map_err(|e| PyValueError::new_err(format!("Conversion error: {}", e)))
}
#[pyo3(signature = (transcript_id, cds_position, offset=None, utr3=false))]
fn c_to_n(
&self,
transcript_id: &str,
cds_position: i64,
offset: Option<i64>,
utr3: bool,
) -> PyResult<(i64, Option<i64>)> {
let transcript = self.provider.get_transcript(transcript_id).map_err(|e| {
PyValueError::new_err(format!("Transcript not found: {}: {}", transcript_id, e))
})?;
let mapper = CoordinateMapper::new(&transcript);
let cds_pos = CdsPos {
base: cds_position,
offset,
utr3,
};
mapper
.cds_to_tx(&cds_pos)
.map(|tx_pos| (tx_pos.base, tx_pos.offset))
.map_err(|e| PyValueError::new_err(format!("Conversion error: {}", e)))
}
#[pyo3(signature = (transcript_id, tx_position, offset=None, downstream=false))]
fn n_to_c(
&self,
transcript_id: &str,
tx_position: i64,
offset: Option<i64>,
downstream: bool,
) -> PyResult<(i64, Option<i64>, bool)> {
let transcript = self.provider.get_transcript(transcript_id).map_err(|e| {
PyValueError::new_err(format!("Transcript not found: {}: {}", transcript_id, e))
})?;
let mapper = CoordinateMapper::new(&transcript);
let tx_pos = TxPos {
base: tx_position,
offset,
downstream,
};
mapper
.tx_to_cds(&tx_pos)
.map(|cds_pos| (cds_pos.base, cds_pos.offset, cds_pos.utr3))
.map_err(|e| PyValueError::new_err(format!("Conversion error: {}", e)))
}
fn get_strand(&self, transcript_id: &str) -> PyResult<PyStrand> {
let transcript = self.provider.get_transcript(transcript_id).map_err(|e| {
PyValueError::new_err(format!("Transcript not found: {}: {}", transcript_id, e))
})?;
Ok(transcript.strand.into())
}
fn get_chromosome(&self, transcript_id: &str) -> PyResult<Option<String>> {
let transcript = self.provider.get_transcript(transcript_id).map_err(|e| {
PyValueError::new_err(format!("Transcript not found: {}: {}", transcript_id, e))
})?;
Ok(transcript.chromosome.clone())
}
fn has_transcript(&self, transcript_id: &str) -> bool {
self.provider.get_transcript(transcript_id).is_ok()
}
}
#[pymodule]
fn ferro_hgvs(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(parse, m)?)?;
m.add_function(wrap_pyfunction!(normalize, m)?)?;
m.add_function(wrap_pyfunction!(parse_spdi, m)?)?;
m.add_function(wrap_pyfunction!(hgvs_to_spdi, m)?)?;
m.add_function(wrap_pyfunction!(spdi_to_hgvs_variant, m)?)?;
m.add_function(wrap_pyfunction!(hgvs_pos_to_index, m)?)?;
m.add_function(wrap_pyfunction!(index_to_hgvs_pos, m)?)?;
m.add_function(wrap_pyfunction!(parse_mave_hgvs_variant, m)?)?;
m.add_function(wrap_pyfunction!(is_mave_short_form_variant, m)?)?;
m.add_function(wrap_pyfunction!(parse_lenient, m)?)?;
m.add_function(wrap_pyfunction!(parse_rsid_value, m)?)?;
m.add_function(wrap_pyfunction!(format_rsid_value, m)?)?;
m.add_function(wrap_pyfunction!(vcf_to_genomic_hgvs, m)?)?;
m.add_function(wrap_pyfunction!(prepare_reference_data, m)?)?;
m.add_function(wrap_pyfunction!(check_reference_data, m)?)?;
m.add_class::<PyHgvsVariant>()?;
m.add_class::<PyNormalizer>()?;
m.add_class::<PySpdiVariant>()?;
m.add_class::<PyZeroBasedPos>()?;
m.add_class::<PyOneBasedPos>()?;
m.add_class::<PyEquivalenceLevel>()?;
m.add_class::<PyEquivalenceResult>()?;
m.add_class::<PyEquivalenceChecker>()?;
m.add_class::<PyConsequence>()?;
m.add_class::<PyImpact>()?;
m.add_class::<PyProteinEffect>()?;
m.add_class::<PyEffectPredictor>()?;
m.add_class::<PyMaveContext>()?;
m.add_class::<PyBatchProgress>()?;
m.add_class::<PyBatchResult>()?;
m.add_class::<PyBatchProcessor>()?;
m.add_class::<PyErrorMode>()?;
m.add_class::<PyErrorType>()?;
m.add_class::<PyErrorOverride>()?;
m.add_class::<PyCorrectionWarning>()?;
m.add_class::<PyErrorConfig>()?;
m.add_class::<PyParseResultWithWarnings>()?;
m.add_class::<PyCodonChange>()?;
m.add_class::<PyCodonTable>()?;
m.add_class::<PyBacktranslator>()?;
m.add_class::<PyRsIdResult>()?;
m.add_class::<PyInMemoryRsIdLookup>()?;
m.add_class::<PyVcfRecord>()?;
m.add_class::<PyPrepareConfig>()?;
m.add_class::<PyReferenceManifest>()?;
m.add_class::<PyGenomeBuild>()?;
m.add_class::<PyStrand>()?;
m.add_class::<PyCoordinateMapper>()?;
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
Ok(())
}