ParallelProcessor

Trait ParallelProcessor 

Source
pub trait ParallelProcessor: Send + Clone {
    // Required method
    fn process_record(&mut self, record: Record) -> Result<()>;

    // Provided methods
    fn on_batch_complete(&mut self) -> Result<()> { ... }
    fn set_tid(&mut self, tid: usize) { ... }
    fn get_tid(&self) -> Option<usize> { ... }
}
Expand description

Trait for types that can process records in parallel.

This trait defines how individual records should be processed and how thread-local results should be aggregated. Implementors must be Send + Clone to enable distribution across threads.

The processing model follows a batch-oriented approach:

  1. Each thread receives its own clone of the processor
  2. Records are processed individually via process_record
  3. After processing a batch, on_batch_complete is called for aggregation
  4. This cycle repeats until all records are processed

§Thread Safety

Processors must be Send and Clone. Each thread gets its own clone, so no explicit synchronization is needed within process_record. However, if you need to aggregate results across threads, use shared state (like Arc<Mutex<T>>) and update it in on_batch_complete.

§Examples

§Simple Record Counter

use ibu::{ParallelProcessor, Record};
use std::sync::{Arc, Mutex};

#[derive(Clone, Default)]
struct RecordCounter {
    local_count: u64,
    global_count: Arc<Mutex<u64>>,
}

impl ParallelProcessor for RecordCounter {
    fn process_record(&mut self, _record: Record) -> ibu::Result<()> {
        self.local_count += 1;
        Ok(())
    }

    fn on_batch_complete(&mut self) -> ibu::Result<()> {
        let mut guard = self.global_count.lock().unwrap();
        *guard += self.local_count;
        self.local_count = 0;
        Ok(())
    }
}

§Barcode Analysis

use ibu::{ParallelProcessor, Record};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

#[derive(Clone)]
struct BarcodeAnalyzer {
    local_stats: HashMap<u64, u64>,
    global_stats: Arc<Mutex<HashMap<u64, u64>>>,
}

impl ParallelProcessor for BarcodeAnalyzer {
    fn process_record(&mut self, record: Record) -> ibu::Result<()> {
        *self.local_stats.entry(record.barcode).or_insert(0) += 1;
        Ok(())
    }

    fn on_batch_complete(&mut self) -> ibu::Result<()> {
        let mut global = self.global_stats.lock().unwrap();
        for (barcode, count) in self.local_stats.drain() {
            *global.entry(barcode).or_insert(0) += count;
        }
        Ok(())
    }
}

Required Methods§

Source

fn process_record(&mut self, record: Record) -> Result<()>

Processes a single record.

This method is called for every record in the dataset. It should be efficient and avoid heavy synchronization, as it’s called millions of times in typical genomics workflows.

Thread-local state can be accumulated here and flushed in on_batch_complete.

§Arguments
  • record - The record to process
§Errors

Should return an error if processing fails. This will stop the entire parallel processing operation.

Provided Methods§

Source

fn on_batch_complete(&mut self) -> Result<()>

Called when a thread finishes processing a batch of records.

This is the appropriate place to:

  • Flush thread-local accumulators to shared state
  • Perform expensive operations that don’t need to happen per-record
  • Update progress indicators

The default implementation does nothing.

§Errors

Should return an error if aggregation fails. This will stop the entire parallel processing operation.

§Examples
use ibu::{ParallelProcessor, Record};
use std::sync::{Arc, Mutex};

#[derive(Clone)]
struct StatCollector {
    local_sum: u64,
    global_sum: Arc<Mutex<u64>>,
}

impl ParallelProcessor for StatCollector {
    fn process_record(&mut self, record: Record) -> ibu::Result<()> {
        self.local_sum += record.index;
        Ok(())
    }

    fn on_batch_complete(&mut self) -> ibu::Result<()> {
        if self.local_sum > 0 {
            let mut guard = self.global_sum.lock().unwrap();
            *guard += self.local_sum;
            self.local_sum = 0;
        }
        Ok(())
    }
}
Source

fn set_tid(&mut self, tid: usize)

Sets the thread ID for this processor instance.

Called once per thread before processing begins. Can be useful for:

  • Thread-specific logging or debugging
  • Implementing thread-aware algorithms
  • Performance profiling per thread

The default implementation does nothing.

§Arguments
  • tid - Thread ID (0-based index)
Source

fn get_tid(&self) -> Option<usize>

Returns the thread ID for this processor instance.

Returns None by default. Implement this if you store the thread ID in set_tid.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§