seq_io_parallel 0.2.1

A map-reduce style parallel extension to seq_io
Documentation
# seq_io_parallel

A parallel processing extension for the [`seq_io`](https://github.com/markschl/seq_io) crate, providing an ergonomic API for parallel FASTA/FASTQ file processing.

For an alternative implementation with native paired-end support see [`paraseq`](https://github.com/noamteyssier/paraseq).

## Overview

While `seq_io` includes parallel implementations for both FASTQ and FASTA readers, this library offers an alternative approach with a potentially more ergonomic API that is not reliant on closures.
The implementation follows a Map-Reduce style of parallelism that emphasizes clarity and ease of use.

This cannot support paired-end processing currently.

## Key Features

- Single-producer multi-consumer parallel processing pipeline
- Map-Reduce style processing architecture
- Support for both FASTA and FASTQ formats
- Thread-safe stateful processing
- Efficient memory management with reusable record sets

## Architecture

The library implements a parallel processing pipeline with the following components:

1. **Reader Thread**: A dedicated thread that continuously fills a limited set of `RecordSets` until EOF
2. **Worker Threads**: Multiple threads that process ready `RecordSets` in parallel
3. **Record Processing**: While `RecordSets` may be processed out of order, records within each set maintain their sequence

## Implementation

### The ParallelProcessor Traits

To use parallel processing, implement one of the following traits:

```rust
// For single-file processing
pub trait ParallelProcessor: Send + Clone {
    // Map: Process individual records
    fn process_record<'a, Rf: MinimalRefRecord<'a>>(&mut self, record: Rf) -> Result<()>;

    // Reduce: Process completed batches (optional)
    fn on_batch_complete(&mut self) -> Result<()> {
        Ok(())
    }
}
```

### Record Access

Both FASTA and FASTQ records are accessed through the `MinimalRefRecord` trait:

```rust
pub trait MinimalRefRecord<'a> {
    fn ref_head(&self) -> &[u8];  // Header data
    fn ref_seq(&self) -> &[u8];   // Sequence data
    fn ref_qual(&self) -> &[u8];  // Quality scores (empty for FASTA)
}
```

### Hooking into the Parallel Processing

This implementation allows for hooking into different stages of the processing pipeline:

1. **Record Processing**: Implement the `process_record` method to process individual records.
2. **Batch Completion**: Implement the `on_batch_complete` method to perform an operation after each batch (optional).
3. **Thread Completion**: Implement the `on_thread_complete` method to perform an operation after all batches within a thread (optional).
4. **Get and Set Thread ID**: Implement the `get_thread_id` and `set_thread_id` methods to access the thread ID (optional).

## Usage Examples

### Single-File Processing

Here's a simple example that performs parallel processing of a FASTQ file:

```rust
use anyhow::Result;
use seq_io::fastq;
use seq_io_parallel::{MinimalRefRecord, ParallelProcessor, ParallelReader};
use std::sync::{atomic::AtomicUsize, Arc};

#[derive(Clone, Default)]
pub struct ExpensiveCalculation {
    local_sum: usize,
    global_sum: Arc<AtomicUsize>,
}

impl ParallelProcessor for ExpensiveCalculation {
    fn process_record<'a, Rf: MinimalRefRecord<'a>>(&mut self, record: Rf) -> Result<()> {
        let seq = record.ref_seq();
        let qual = record.ref_qual();

        // Simulate expensive calculation
        for _ in 0..100 {
            for (s, q) in seq.iter().zip(qual.iter()) {
                self.local_sum += (*s - 33) as usize + (*q - 33) as usize;
            }
        }
        Ok(())
    }

    fn on_batch_complete(&mut self) -> Result<()> {
        self.global_sum
            .fetch_add(self.local_sum, std::sync::atomic::Ordering::Relaxed);
        self.local_sum = 0;
        Ok(())
    }
}

fn main() -> Result<()> {
    let path = std::env::args().nth(1).expect("No path provided");
    let num_threads = std::env::args()
        .nth(2)
        .map(|n| n.parse().unwrap())
        .unwrap_or(1);

    let (handle, _) = niffler::send::from_path(&path)?;
    let reader = fastq::Reader::new(handle);
    let processor = ExpensiveCalculation::default();

    reader.process_parallel(processor.clone(), num_threads)?;
    Ok(())
}
```

## Performance Considerations

FASTA/FASTQ processing is typically I/O-bound, so parallel processing benefits may vary:

- Best for computationally expensive operations (e.g., alignment, k-mer counting)
- Performance gains depend on the ratio of I/O to processing time
- Consider using `Arc` for processor state with heavy initialization costs

## Implementation Notes

- Each worker thread receives a `Clone` of the `ParallelProcessor`
- Thread-local state can be maintained without locks
- Global state should use appropriate synchronization (e.g., `Arc<AtomicUsize>`)
- Heavy initialization costs can be mitigated by wrapping in `Arc`

## Future Work

Currently this library is making use of `anyhow` for all error handling.
This is not ideal for custom error types in libraries, but for many CLI tools will work just fine.
In the future this may change.