seq_io_parallel 0.2.1

A map-reduce style parallel extension to seq_io
Documentation
use anyhow::{bail, Result};
use seq_io::fastq;
use seq_io_parallel::{MinimalRefRecord, ParallelProcessor, ParallelReader};
use std::sync::{atomic::AtomicUsize, Arc};

#[derive(Clone, Default)]
pub struct ExpensiveCalculation {
    local_sum: usize,
    local_num_records: usize,
    global_sum: Arc<AtomicUsize>,
    global_num_records: Arc<AtomicUsize>,
}
impl ExpensiveCalculation {
    pub fn get_global_sum(&self) -> usize {
        self.global_sum.load(std::sync::atomic::Ordering::Relaxed)
    }
    pub fn get_global_num_records(&self) -> usize {
        self.global_num_records
            .load(std::sync::atomic::Ordering::Relaxed)
    }
}
impl ParallelProcessor for ExpensiveCalculation {
    fn process_record<'a, Rf: MinimalRefRecord<'a>>(&mut self, record: Rf) -> Result<()> {
        let seq = record.ref_seq();
        let qual = record.ref_qual();

        for _ in 0..100 {
            for (s, q) in seq.iter().zip(qual.iter()) {
                self.local_sum += (*s - 33) as usize + (*q - 33) as usize;
            }
        }

        self.local_num_records += 1;

        Ok(())
    }

    fn on_batch_complete(&mut self) -> Result<()> {
        self.global_sum
            .fetch_add(self.local_sum, std::sync::atomic::Ordering::Relaxed);

        self.global_num_records
            .fetch_add(self.local_num_records, std::sync::atomic::Ordering::Relaxed);

        self.local_sum = 0;
        self.local_num_records = 0;
        Ok(())
    }
}

pub fn main() -> Result<()> {
    let args = std::env::args().collect::<Vec<String>>();
    let path = match args.get(1) {
        Some(path) => path,
        None => bail!("No path provided"),
    };
    let num_threads = match args.get(2) {
        Some(num_threads) => num_threads.parse::<usize>()?,
        None => 1,
    };

    let (handle, _format) = niffler::send::from_path(path)?;
    let reader = fastq::Reader::new(handle);
    let processor = ExpensiveCalculation::default();
    reader.process_parallel(processor.clone(), num_threads)?;

    println!("Global sum: {}", processor.get_global_sum());
    println!("Global num records: {}", processor.get_global_num_records());

    Ok(())
}