redicat 0.4.2

REDICAT - RNA Editing Cellular Assessment Toolkit: A highly parallelized utility for analyzing RNA editing events in single-cell RNA-seq data
Documentation
use parking_lot::Mutex;
use redicat_lib::core::read_filter::DefaultReadFilter;
use redicat_lib::engine::position::pileup_position::PileupPosition;
use redicat_lib::engine::RegionProcessor;
use rust_htslib::{bam, bam::Read};
use rustc_hash::FxHashSet;
use std::cmp::max;
use std::convert::TryInto;
use std::path::PathBuf;
use std::sync::Arc;

use super::args::BulkConfig;

#[inline]
fn near_max_depth(effective_depth: u32, max_depth: u32) -> bool {
    if max_depth == 0 {
        return false;
    }

    // Mark positions when observed depth is within 0.01% of the configured cap:
    // effective_depth >= max_depth * (1 - 0.0001)
    (effective_depth as u64).saturating_mul(10_000) >= (max_depth as u64).saturating_mul(9_999)
}

/// Implements [`RegionProcessor`] for bulk pileup traversal.
pub struct BaseProcessor {
    reads: PathBuf,
    coord_base: u32,
    max_depth: u32,
    min_depth: u32,
    max_n_fraction: u32,
    min_baseq: Option<u8>,
    edited_mode: bool,
    editing_threshold: u32,
    read_filter: Arc<DefaultReadFilter>,
    reader_pool: Mutex<Vec<bam::IndexedReader>>,
    allowed_tids: Option<FxHashSet<u32>>,
}

impl BaseProcessor {
    pub fn from_config(
        config: &BulkConfig,
        read_filter: Arc<DefaultReadFilter>,
        allowed_tids: Option<FxHashSet<u32>>,
    ) -> Self {
        Self {
            reads: config.reads.clone(),
            coord_base: config.coord_offset,
            max_depth: config.max_depth,
            min_depth: config.min_depth,
            max_n_fraction: config.max_n_fraction.max(1),
            min_baseq: config.min_baseq.or(Some(30)),
            edited_mode: config.editing_mode(),
            editing_threshold: config.editing_threshold.max(1),
            read_filter,
            reader_pool: Mutex::new(Vec::new()),
            allowed_tids,
        }
    }
}

impl RegionProcessor for BaseProcessor {
    type P = PileupPosition;

    fn process_region(&self, tid: u32, start: u32, stop: u32) -> Vec<Self::P> {
        if let Some(allowed) = &self.allowed_tids {
            if !allowed.contains(&tid) {
                return Vec::new();
            }
        }

        let mut reader = {
            let mut pool = self.reader_pool.lock();
            pool.pop()
        }
        .unwrap_or_else(|| {
            bam::IndexedReader::from_path(&self.reads)
                .unwrap_or_else(|e| panic!("Failed to open {}: {}", self.reads.display(), e))
        });

        let header = reader.header().to_owned();
        reader
            .fetch((tid, start, stop))
            .expect("Fetched requested region");
        let mut pileup = reader.pileup();
        pileup.set_max_depth(std::cmp::min(i32::MAX.try_into().unwrap(), self.max_depth));

        let estimated = (stop.saturating_sub(start) / 10).max(16) as usize;
        let mut result = Vec::with_capacity(estimated);
        for p in pileup {
            let pileup = match p {
                Ok(p) => p,
                Err(_) => continue,
            };

            if pileup.pos() < start || pileup.pos() >= stop {
                continue;
            }

            let mut pos = PileupPosition::from_pileup(
                pileup,
                &header,
                self.read_filter.as_ref(),
                self.min_baseq,
            );
            pos.pos += self.coord_base;

            if self.max_depth > 0 {
                let combined_depth = pos
                    .depth
                    .saturating_add(pos.ref_skip)
                    .saturating_add(pos.fail);
                let effective_depth = combined_depth.max(pos.depth);
                pos.near_max_depth = near_max_depth(effective_depth, self.max_depth);
            } else {
                pos.near_max_depth = false;
            }

            let should_include = if self.edited_mode {
                let valid_value = max(pos.depth / self.editing_threshold, 1);
                let mut count = 0;
                if pos.a > valid_value {
                    count += 1;
                }
                if pos.t > valid_value {
                    count += 1;
                }
                if pos.g > valid_value {
                    count += 1;
                }
                if pos.c > valid_value {
                    count += 1;
                }

                (pos.depth >= self.min_depth)
                    && (count >= 2)
                    && (pos.n <= max(pos.depth / self.max_n_fraction, 2))
            } else {
                pos.depth >= self.min_depth
            };

            if should_include {
                result.push(pos);
            }
        }

        {
            let mut pool = self.reader_pool.lock();
            pool.push(reader);
        }

        result
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use redicat_lib::core::read_filter::DefaultReadFilter;
    use std::path::PathBuf;
    use std::sync::Arc;

    #[test]
    fn test_near_max_depth_boundary_at_threshold() {
        // 99.99% of 10,000 is 9,999; threshold should be inclusive.
        assert!(near_max_depth(9_999, 10_000));
    }

    #[test]
    fn test_near_max_depth_boundary_below_threshold() {
        assert!(!near_max_depth(9_998, 10_000));
    }

    #[test]
    fn test_near_max_depth_normal_cases() {
        // Should now be false under the previous 1% window but outside 0.01%.
        assert!(!near_max_depth(9_900, 10_000));

        // Values at or above max_depth remain flagged.
        assert!(near_max_depth(10_000, 10_000));
        assert!(near_max_depth(10_500, 10_000));
    }

    #[test]
    fn test_near_max_depth_edge_cases() {
        // max_depth disabled
        assert!(!near_max_depth(0, 0));
        assert!(!near_max_depth(10, 0));

        // Integer boundary where ceil(0.9999 * 9_999) == 9_999
        assert!(!near_max_depth(9_998, 9_999));
        assert!(near_max_depth(9_999, 9_999));

        // Large values should remain safe and deterministic.
        assert!(near_max_depth(u32::MAX, u32::MAX));
        assert!(!near_max_depth(u32::MAX - 1_000_000, u32::MAX));
    }

    #[test]
    fn test_process_region_chr22_smoke() {
        let bam = PathBuf::from("test/chr22.bam");
        if !bam.exists() {
            log::info!("Skipping chr22 smoke test – fixture missing");
            return;
        }

        let config = BulkConfig {
            reads: bam.clone(),
            output: PathBuf::from("test.tsv.gz"),
            threads: 2,
            chunksize: 10_000,
            min_baseq: Some(30),
            mapquality: 255,
            coord_offset: 1,
            max_depth: 10_000,
            min_depth: 10,
            max_n_fraction: 20,
            report_all: true,
            editing_threshold: 1000,
            all_contigs: true,
        };

        let processor = BaseProcessor::from_config(
            &config,
            Arc::new(DefaultReadFilter::new(config.mapquality)),
            None,
        );

        let results = processor.process_region(14, 22_901_237, 22_901_238);
        log::info!("Smoke test processed {} positions", results.len());
    }
}