[][src]Module seq_io::parallel

Experiments with parallel processing

The provided functions focus on the possibility of returning results while the parser proceeds. Sequences are processesd in batches (RecordSet) because sending across channels has a performance impact. FASTA/FASTQ records can be accessed in both the 'worker' function and (after processing) a function running in the main thread.

Search first occurrence of a sequence pattern

use seq_io::fastq::{Reader,Record};
use seq_io::parallel::read_parallel;

let reader = Reader::from_path("seqs.fastq").unwrap();

read_parallel(reader, 4, 2, |record_set| {
    // this function does the heavy work
    for (i, record) in record_set.into_iter().enumerate() {
        // this is not very efficient code, just for demonstration
        if let Some(pos) = record.seq().windows(3).position(|s| s == b"AAA") {
            return Some((i, pos));
        }
    }
    None
}, |record_sets| {
    // This function runs in the main thread. It provides a streaming iterator over
    // record sets and the corresponding return values from the worker function
    // (not necessarily in the same order as in the file)
    while let Some(result) = record_sets.next() {
        let (record_set, found) = result.unwrap();
        if let Some((i, pos)) = found {
            let record = record_set.into_iter().nth(i).unwrap();
            println!("Found AAA in record {} at position {}", record.id().unwrap(), pos);
             // this will also stop the worker threads, although with some delay
            return;
        }
    }
});

Per-record processsing

The parallel_fasta / parallel_fastq functions are designed to efficiently pass results for each record to the main thread without having to care about record sets. This example filters sequences by the occurrence of a pattern:

use seq_io::fastq::{Reader,Record};
use seq_io::parallel::parallel_fastq;
use std::fs::File;
use std::io::BufWriter;

let reader = Reader::from_path("seqs.fastq").unwrap();
let mut writer = BufWriter::new(File::create("filtered.fastq").unwrap());

parallel_fastq(reader, 4, 2,
    |record, found| { // runs in worker
        *found = record.seq().windows(3).position(|s| s == b"AAA").is_some();
    },
    |record, found| { // runs in main thread
        if *found {
            record.write(&mut writer).unwrap();
        }
        // Some(value) will stop the reader, and the value will be returned.
        // In the case of never stopping, we need to give the compiler a hint about the
        // type parameter, thus the special 'turbofish' notation is needed,
        // hoping on progress here: https://github.com/rust-lang/rust/issues/27336
        None::<()>
}).unwrap();

Structs

ParallelRecordsets
ReusableReader

Wrapper for parallel::Reader instances allowing the output to be reused in order to save allocations. Used by parallel_fasta/parallel_fastq

Traits

Reader

Functions

parallel_fasta

Function reading records in a different thread. processing them in another worker thread and finally returning the results to the main thread.

parallel_fasta_init

More customisable function doing per-record processing with closures for initialization and moer options.

parallel_fastq

Function reading records in a different thread. processing them in another worker thread and finally returning the results to the main thread.

parallel_fastq_init

More customisable function doing per-record processing with closures for initialization and moer options.

parallel_records

Using this function currently does not work due to a compiler bug.

read_parallel
read_parallel_init

This function allows initiating the reader and datasets using a closure. This is more flexible and allows readers not to be Send