genome-sh 0.1.0

The jq of genomics. Fast, local, human-readable variant analysis.
use anyhow::{Context, Result};
use indicatif::{ProgressBar, ProgressStyle};
use std::io::{BufRead, BufReader};

use super::Database;

const CLINVAR_URL: &str =
    "https://ftp.ncbi.nlm.nih.gov/pub/clinvar/vcf_GRCh38/clinvar.vcf.gz";

/// Download and import ClinVar data.
pub async fn download_clinvar(db: &Database) -> Result<()> {
    println!("Downloading ClinVar...");

    let response = reqwest::get(CLINVAR_URL)
        .await
        .context("Failed to download ClinVar")?;

    let total_size = response.content_length().unwrap_or(0);
    let pb = progress_bar(total_size, "ClinVar");

    let bytes = response.bytes().await?;
    pb.finish_with_message("Downloaded");

    println!("Importing ClinVar variants...");

    let decoder = flate2::read::GzDecoder::new(bytes.as_ref());
    let reader = BufReader::new(decoder);

    let conn = db.connection();
    conn.execute_batch("BEGIN TRANSACTION")?;

    let mut insert_variant = conn.prepare_cached(
        "INSERT OR IGNORE INTO variants (chrom, pos, ref, alt, rsid, gene, assembly)
         VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'GRCh38')",
    )?;

    let mut insert_clinvar = conn.prepare_cached(
        "INSERT OR REPLACE INTO clinvar (variant_id, significance, review_stars, conditions, submission_count)
         VALUES (
            (SELECT id FROM variants WHERE chrom = ?1 AND pos = ?2 AND ref = ?3 AND alt = ?4),
            ?5, ?6, ?7, ?8
         )",
    )?;

    let mut count = 0u64;

    for line in reader.lines() {
        let line = line?;
        if line.starts_with('#') {
            continue;
        }

        let fields: Vec<&str> = line.splitn(8, '\t').collect();
        if fields.len() < 8 {
            continue;
        }

        let chrom = normalize_chrom(fields[0]);
        let pos: i64 = fields[1].parse().unwrap_or(0);
        let reference = fields[3];
        let alt = fields[4];
        let info = fields[7];

        // ClinVar stores rsIDs in INFO as RS=<number>, not in the ID column.
        let rsid = if fields[2].starts_with("rs") {
            Some(fields[2].to_string())
        } else {
            extract_info_field(info, "RS").map(|n| format!("rs{n}"))
        };

        let gene = extract_info_field(info, "GENEINFO")
            .map(|g| g.split(':').next().unwrap_or("").to_string());
        let significance = extract_info_field(info, "CLNSIG");
        let review_stars = extract_review_stars(info);
        let conditions = extract_info_field(info, "CLNDN");

        // Handle multi-allelic sites.
        for single_alt in alt.split(',') {
            insert_variant.execute(rusqlite::params![
                chrom,
                pos,
                reference,
                single_alt,
                rsid,
                gene,
            ])?;

            if let Some(ref sig) = significance {
                insert_clinvar.execute(rusqlite::params![
                    chrom,
                    pos,
                    reference,
                    single_alt,
                    sig,
                    review_stars,
                    conditions,
                    1i32, // submission_count placeholder
                ])?;
            }

            count += 1;
        }

        if count % 100_000 == 0 {
            eprint!("\r  Imported {count} variants...");
        }
    }

    conn.execute_batch("COMMIT")?;

    // Record metadata.
    conn.execute(
        "INSERT OR REPLACE INTO db_meta (source, version, updated_at, variant_count)
         VALUES ('clinvar', 'latest', datetime('now'), ?1)",
        [count as i64],
    )?;

    println!("\r  Imported {count} ClinVar variants.");

    Ok(())
}

/// Download and import gnomAD data.
pub async fn download_gnomad(_db: &Database) -> Result<()> {
    // gnomAD is very large. For the standard tier, we download the
    // allele-frequency-only subset or the sites VCF and extract AF fields.
    eprintln!("gnomAD download is not yet implemented (requires ~5 GB download).");
    eprintln!("For now, install the lite tier: genome db install lite");
    Ok(())
}

/// Download and import dbSNP data.
pub async fn download_dbsnp(_db: &Database) -> Result<()> {
    eprintln!("dbSNP download is not yet implemented.");
    Ok(())
}

/// Download and import PharmGKB data.
pub async fn download_pharmgkb(_db: &Database) -> Result<()> {
    eprintln!("PharmGKB download is not yet implemented.");
    Ok(())
}

fn normalize_chrom(chrom: &str) -> String {
    if chrom.starts_with("chr") {
        chrom.to_string()
    } else {
        format!("chr{chrom}")
    }
}

fn extract_info_field<'a>(info: &'a str, key: &str) -> Option<String> {
    for field in info.split(';') {
        if let Some(value) = field.strip_prefix(key).and_then(|s| s.strip_prefix('=')) {
            return Some(value.replace('_', " "));
        }
    }
    None
}

fn extract_review_stars(info: &str) -> i32 {
    let rev_stat = extract_info_field(info, "CLNREVSTAT").unwrap_or_default();
    match rev_stat.as_str() {
        s if s.contains("practice_guideline") => 4,
        s if s.contains("reviewed_by_expert_panel") => 3,
        s if s.contains("criteria_provided,_multiple_submitters") => 2,
        s if s.contains("criteria_provided,_single_submitter") => 1,
        _ => 0,
    }
}

fn progress_bar(total: u64, label: &str) -> ProgressBar {
    let pb = if total > 0 {
        ProgressBar::new(total)
    } else {
        ProgressBar::new_spinner()
    };

    pb.set_style(
        ProgressStyle::default_bar()
            .template(&format!(
                "  {label} [{{bar:40.cyan/blue}}] {{bytes}}/{{total_bytes}} ({{eta}})"
            ))
            .unwrap_or_else(|_| ProgressStyle::default_bar())
            .progress_chars("=> "),
    );

    pb
}