Module seq_io::parallel

source ·
Expand description

Experiments with parallel processing

The provided functions focus on the possibility of returning results while the parser proceeds. Sequences are processesd in batches (RecordSet) because sending across channels has a performance impact. FASTA/FASTQ records can be accessed in both the ‘worker’ function and (after processing) a function running in the main thread.

Search first occurrence of a sequence pattern

use seq_io::fastq::{Reader,Record};
use seq_io::parallel::read_parallel;

let reader = Reader::from_path("seqs.fastq").unwrap();

read_parallel(reader, 4, 2, |record_set| {
    // this function does the heavy work
    for (i, record) in record_set.into_iter().enumerate() {
        // this is not very efficient code, just for demonstration
        if let Some(pos) = record.seq().windows(3).position(|s| s == b"AAA") {
            return Some((i, pos));
        }
    }
    None
}, |record_sets| {
    // This function runs in the main thread. It provides a streaming iterator over
    // record sets and the corresponding return values from the worker function
    // (not necessarily in the same order as in the file)
    while let Some(result) = record_sets.next() {
        let (record_set, found) = result.unwrap();
        if let Some((i, pos)) = found {
            let record = record_set.into_iter().nth(i).unwrap();
            println!("Found AAA in record {} at position {}", record.id().unwrap(), pos);
             // this will also stop the worker threads, although with some delay
            return;
        }
    }
});

Per-record processsing

The parallel_fasta / parallel_fastq functions are designed to efficiently pass results for each record to the main thread without having to care about record sets. This example filters sequences by the occurrence of a pattern:

use seq_io::fastq::{Reader,Record};
use seq_io::parallel::parallel_fastq;
use std::fs::File;
use std::io::BufWriter;

let reader = Reader::from_path("seqs.fastq").unwrap();
let mut writer = BufWriter::new(File::create("filtered.fastq").unwrap());

parallel_fastq(reader, 4, 2,
    |record, found| { // runs in worker
        *found = record.seq().windows(3).position(|s| s == b"AAA").is_some();
    },
    |record, found| { // runs in main thread
        if *found {
            record.write(&mut writer).unwrap();
        }
        // Some(value) will stop the reader, and the value will be returned.
        // In the case of never stopping, we need to give the compiler a hint about the
        // type parameter, thus the special 'turbofish' notation is needed,
        // hoping on progress here: https://github.com/rust-lang/rust/issues/27336
        None::<()>
}).unwrap();

Structs

  • Wrapper for parallel::Reader instances allowing the output to be reused in order to save allocations. Used by parallel_fasta/parallel_fastq

Traits

Functions

  • Function reading records in a different thread. processing them in another worker thread and finally returning the results to the main thread.
  • More customisable function doing per-record processing with closures for initialization and moer options.
  • Function reading records in a different thread. processing them in another worker thread and finally returning the results to the main thread.
  • More customisable function doing per-record processing with closures for initialization and moer options.
  • Using this function currently does not work due to a compiler bug.
  • This function allows initiating the reader and datasets using a closure. This is more flexible and allows readers not to be Send