kira-spliceqc 0.2.0

Deterministic, explainable splicing QC for single-cell expression data.
Documentation
use std::time::Instant;

use tracing::info;

use crate::cli::PipelineContext;
use crate::cli::config::{AnalysisMode, RunConfig, RunMode};
use crate::expression::ExpressionMatrix;
use crate::genesets::catalog::default_catalog_path;
use crate::genesets::load_catalog;
use crate::input::error::InputError;
use crate::output::pipeline_contract;
use crate::pipeline::stage0_input::run_stage0;
use crate::pipeline::stage1_expression::run_stage1;
use crate::pipeline::stage2_genesets::aggregate as run_stage2;
use crate::pipeline::stage3_isoform::compute as run_stage3;
use crate::pipeline::stage4_missplicing::compute as compute_missplicing;
use crate::pipeline::stage5_imbalance::compute as compute_imbalance;
use crate::pipeline::stage6_sis::run_stage6;
use crate::pipeline::stage7_output::{OutputOptions, run_stage7};
use crate::pipeline::stage8_coupling::compute as compute_coupling;
use crate::pipeline::stage9_exon_intron_bias::compute as compute_exon_intron;
use crate::pipeline::stage10_assembly_phase::compute as compute_assembly;
use crate::pipeline::stage11_splicing_noise::compute as compute_splicing_noise;
use crate::pipeline::stage12_cryptic_risk::compute as compute_cryptic_risk;
use crate::pipeline::stage13_collapse::compute as compute_collapse;
use crate::pipeline::stage15_splicing_instability::compute as compute_splicing_instability;

#[derive(Debug, thiserror::Error)]
pub enum SpliceQcError {
    #[error("invalid input: {0}")]
    InvalidInput(String),
    #[error("pipeline failure: {0}")]
    PipelineFailure(String),
    #[error(transparent)]
    Io(#[from] std::io::Error),
}

pub fn run_pipeline(config: RunConfig) -> Result<(), SpliceQcError> {
    if config.mode != AnalysisMode::Cell {
        return Err(SpliceQcError::InvalidInput(
            "sample mode is not implemented".to_string(),
        ));
    }

    if let Some(threads) = config.threads {
        rayon::ThreadPoolBuilder::new()
            .num_threads(threads)
            .build_global()
            .map_err(|e| SpliceQcError::InvalidInput(format!("failed to set threads: {e}")))?;
    }

    info!(
        target: "kira_spliceqc::cli::run",
        simd_backend = crate::simd::backend(),
        "compute backend selected"
    );

    let effective_out_dir = if config.run_mode == RunMode::Pipeline {
        pipeline_contract::pipeline_out_dir(&config.out_dir)
    } else {
        config.out_dir.clone()
    };
    std::fs::create_dir_all(&effective_out_dir)?;

    let stage0 = run_logged(0, || {
        run_stage0(&config.input, config.run_mode, config.cache_path.as_deref())
    })?;
    let stage1 = run_logged(1, || run_stage1(&stage0, &effective_out_dir))?;

    // Load catalog ONCE and pass &GenesetCatalog to all downstream consumers
    // (previously stages 2, 3 and pipeline contract each reloaded it).
    let catalog_path = default_catalog_path();
    let catalog = load_catalog(&catalog_path, &stage1)?;

    let stage2 = run_logged(2, || run_stage2(&stage1, &catalog))?;
    let stage3 = run_logged(3, || run_stage3(&stage1, &catalog))?;
    let stage4 = run_logged(4, || compute_missplicing(&stage2))?;
    let stage5 = run_logged(5, || compute_imbalance(&stage2))?;
    let stage6 = run_logged(6, || run_stage6(&stage3, &stage4, &stage5))?;
    let stage15 = run_logged(15, || Ok(compute_splicing_instability(&stage1)))?;

    let (stage8, stage9, stage10, stage11, stage12, stage13, stage14) = if config.extended {
        let stage8 = run_logged(8, || compute_coupling(&stage2))?;
        let stage9 = run_logged(9, || compute_exon_intron(&stage2))?;
        let stage10 = run_logged(10, || compute_assembly(&stage2))?;
        let stage11 = run_logged(11, || compute_splicing_noise(&stage2))?;
        let stage12 = run_logged(12, || compute_cryptic_risk(&stage3, &stage5))?;
        let stage13 = run_logged(13, || compute_collapse(&stage5, &stage6))?;
        (
            Some(stage8),
            Some(stage9),
            Some(stage10),
            Some(stage11),
            Some(stage12),
            Some(stage13),
            None,
        )
    } else {
        (None, None, None, None, None, None, None)
    };

    let cell_names = (0..stage1.n_cells())
        .map(|i| stage1.cell_name(i).to_string())
        .collect::<Vec<_>>();

    let context = PipelineContext {
        stage0,
        stage1,
        stage2,
        stage3,
        stage4,
        stage5,
        stage6,
        stage8,
        stage9,
        stage10,
        stage11,
        stage12,
        stage13,
        stage14,
        stage15,
    };

    if config.extended && context.stage14.is_none() {
        info!(target: "kira_spliceqc::cli::run", "skipping Stage 14 (no timecourse metadata)");
    }

    let summary = run_logged(7, || {
        run_stage7(
            &effective_out_dir,
            &cell_names,
            &context.stage3,
            &context.stage4,
            &context.stage5,
            &context.stage6,
            context.stage8.as_ref(),
            context.stage9.as_ref(),
            context.stage10.as_ref(),
            context.stage11.as_ref(),
            context.stage12.as_ref(),
            context.stage13.as_ref(),
            context.stage14.as_ref(),
            &context.stage15,
            OutputOptions {
                json: config.output_json,
                tsv: config.output_tsv,
            },
        )
    })?;

    if config.run_mode == RunMode::Pipeline {
        info!(
            target: "kira_spliceqc::cli::run",
            "starting pipeline contract artifact generation"
        );
        pipeline_contract::write_pipeline_contract(
            &effective_out_dir,
            &context.stage0,
            &context.stage1,
            &context.stage4,
            &context.stage5,
            &context.stage6,
            context.stage8.as_ref(),
            &context.stage15,
            &catalog,
        )?;
        info!(
            target: "kira_spliceqc::cli::run",
            "finished pipeline contract artifact generation"
        );
    }

    print!("{summary}");
    Ok(())
}

fn run_logged<T, F>(stage: usize, f: F) -> Result<T, SpliceQcError>
where
    F: FnOnce() -> Result<T, InputError>,
{
    info!(target: "kira_spliceqc::cli::run", "starting Stage {stage}");
    let start = Instant::now();
    let result = f().map_err(SpliceQcError::from)?;
    let elapsed = start.elapsed();
    let elapsed_fmt = format!("{elapsed:.2?}");
    info!(
        target: "kira_spliceqc::cli::run",
        "finished Stage {stage} (elapsed: {elapsed_fmt})"
    );
    Ok(result)
}

impl From<InputError> for SpliceQcError {
    fn from(err: InputError) -> Self {
        SpliceQcError::PipelineFailure(err.to_string())
    }
}