[−][src]Module seq_io::parallel
Functions for parallel processing of record sets and records.
Sequences are read and processed in batches (RecordSet
) because sending
data across channels has a performance impact. The process works as follows:
- Sequence parsing is done in a background thread
- Record sets are sent to worker threads, where expensive operations take place (e.g. sequence analysis).
- The results are sent to the main thread along with the record sets.
- The record sets are recycled by sending them back to the background reader.
Per-record processsing
The easiest to use are the functions, which operate directly on sequence records without having to deal with record sets:
They are specific for the given sequence format, but it is possible to
generate functions for other types using the
parallel_record_impl
macro.
Example
This example filters sequences by the occurrence of a pattern:
use seq_io::prelude::*; use seq_io::fastq::{Reader,Record}; use seq_io::parallel::read_process_fastq_records; use std::fs::File; use std::io::BufWriter; let reader = Reader::from_path("seqs.fastq").unwrap(); let mut writer = BufWriter::new(File::create("filtered.fastq").unwrap()); read_process_fastq_records(reader, 4, 2, |record, found| { // runs in worker *found = record.seq().windows(3).position(|s| s == b"AAA").is_some(); }, |record, found| { // runs in main thread if *found { record.write(&mut writer).unwrap(); } // Some(value) will stop the reader, and the value will be returned. // In the case of never stopping, we need to give the compiler a hint about the // type parameter, thus the special 'turbofish' notation is needed. None::<()> }).unwrap();
Record set processing
It is still possible to directly work with record sets using the following generic functions:
Example
This example searches for the first occurrence of a sequence pattern and then stops the parser.
use seq_io::prelude::*; use seq_io::fastq; use seq_io::parallel::read_process_recordsets; let reader = fastq::Reader::from_path("seqs.fastq").unwrap(); read_process_recordsets(reader, 4, 2, |record_set, position| { // This function does the heavy work. // The code is not necessarily very efficient, just for demonstration. for (i, record) in record_set.into_iter().enumerate() { if let Some(pos) = record.seq().windows(3).position(|s| s == b"AAA") { *position = Some((i, pos)); } } *position = None; }, |mut record_sets| { // This function runs in the main thread. It provides a streaming iterator over // record sets and the corresponding return values from the worker function // (not necessarily in the same order as in the file) while let Some(result) = record_sets.next() { let (record_set, position) = result?; if let Some(&(i, pos)) = position.as_ref() { let record = record_set.into_iter().nth(i).unwrap(); println!("Found AAA in record {} at position {}", record.id().unwrap(), pos); return Ok(()); } } // Here, we need to give the compiler a type hint about the returned // result, since it is not smart enough to infer it. // In real-world programs, this may be less of an issue because the // returned result type is often known. Ok::<_, fastq::Error>(()) } ).expect("FASTQ reading error");
Structs
ParallelDataSets |
Traits
RecordSetReader | A simple trait required to be implemented for readers fed into the functions in this module. |
Functions
read_process_fasta_records | This function wraps |
read_process_fasta_records_init | Like |
read_process_fastq_records | This function wraps |
read_process_fastq_records_init | Like |
read_process_fastx_records | This function wraps |
read_process_fastx_records_init | Like |
read_process_records_init | Using this function currently does not work due to a compiler bug. |
read_process_recordsets | This function reads record sets and processes them in parallel threads. |
read_process_recordsets_init | Like |