[][src]Module seq_io::parallel

Functions for parallel processing of record sets and records.

Sequences are read and processed in batches (RecordSet) because sending data across channels has a performance impact. The process works as follows:

  • Sequence parsing is done in a background thread
  • Record sets are sent to worker threads, where expensive operations take place (e.g. sequence analysis).
  • The results are sent to the main thread along with the record sets.
  • The record sets are recycled by sending them back to the background reader.

Per-record processsing

The easiest to use are the functions, which operate directly on sequence records without having to deal with record sets:

They are specific for the given sequence format, but it is possible to generate functions for other types using the parallel_record_impl macro.

Example

This example filters sequences by the occurrence of a pattern:

use seq_io::prelude::*;
use seq_io::fastq::{Reader,Record};
use seq_io::parallel::read_process_fastq_records;
use std::fs::File;
use std::io::BufWriter;

let reader = Reader::from_path("seqs.fastq").unwrap();
let mut writer = BufWriter::new(File::create("filtered.fastq").unwrap());

read_process_fastq_records(reader, 4, 2,
    |record, found| { // runs in worker
        *found = record.seq().windows(3).position(|s| s == b"AAA").is_some();
    },
    |record, found| { // runs in main thread
        if *found {
            record.write(&mut writer).unwrap();
        }
        // Some(value) will stop the reader, and the value will be returned.
        // In the case of never stopping, we need to give the compiler a hint about the
        // type parameter, thus the special 'turbofish' notation is needed.
        None::<()>
}).unwrap();

Record set processing

It is still possible to directly work with record sets using the following generic functions:

Example

This example searches for the first occurrence of a sequence pattern and then stops the parser.

use seq_io::prelude::*;
use seq_io::fastq;
use seq_io::parallel::read_process_recordsets;

let reader = fastq::Reader::from_path("seqs.fastq").unwrap();

read_process_recordsets(reader, 4, 2,
    |record_set, position| {
        // This function does the heavy work.
        // The code is not necessarily very efficient, just for demonstration.
        for (i, record) in record_set.into_iter().enumerate() {
            if let Some(pos) = record.seq().windows(3).position(|s| s == b"AAA") {
            *position = Some((i, pos));
            }
        }
        *position = None;
    }, |mut record_sets| {
        // This function runs in the main thread. It provides a streaming iterator over
        // record sets and the corresponding return values from the worker function
        // (not necessarily in the same order as in the file)
        while let Some(result) = record_sets.next() {
            let (record_set, position) = result?;
            if let Some(&(i, pos)) = position.as_ref() {
                let record = record_set.into_iter().nth(i).unwrap();
                println!("Found AAA in record {} at position {}", record.id().unwrap(), pos);
                return Ok(());
            }
        }
        // Here, we need to give the compiler a type hint about the returned
        // result, since it is not smart enough to infer it.
        // In real-world programs, this may be less of an issue because the
        // returned result type is often known.
        Ok::<_, fastq::Error>(())
    }
).expect("FASTQ reading error");

Structs

ParallelDataSets

Traits

RecordSetReader

A simple trait required to be implemented for readers fed into the functions in this module.

Functions

read_process_fasta_records

This function wraps read_process_recordsets, hiding the complexity related to record sets and allowing it to directly work on

read_process_fasta_records_init

Like

read_process_fastq_records

This function wraps read_process_recordsets, hiding the complexity related to record sets and allowing it to directly work on

read_process_fastq_records_init

Like

read_process_fastx_records

This function wraps read_process_recordsets, hiding the complexity related to record sets and allowing it to directly work on

read_process_fastx_records_init

Like

read_process_records_init

Using this function currently does not work due to a compiler bug.

read_process_recordsets

This function reads record sets and processes them in parallel threads.

read_process_recordsets_init

Like read_process_recordsets, but additionally allows initiating the reader in the background thread using a closure (reader_init). This is useful for readers, which don't implement Send. The reader_init closure has to return a result. Errors are returned from the main function witout being mixed with reading errors. This may lead to nested Result being returned if the func closure returns Result.