genomicframe-core 0.2.0

High-performance genomics I/O and interoperability layer
Documentation
//! Parallel processing infrastructure for genomic data
//!
//! This module provides traits and utilities for parallel computation
//! of statistics across large genomic files using Rayon.

use crate::error::Result;
use lazy_static::lazy_static;
use rayon::ThreadPool;
use std::collections::HashMap;
use std::io::{BufRead, Seek, SeekFrom};
use std::sync::{Arc, Mutex};

/// Trait for statistics that can be merged from parallel computations
///
/// This is the key trait that enables parallel processing:
/// 1. Each thread computes partial statistics on its chunk
/// 2. Partial statistics are merged at the end
///
/// # Properties
/// For correct parallel computation, merge must be:
/// - **Associative**: (a ⊕ b) ⊕ c = a ⊕ (b ⊕ c)
/// - **Commutative**: a ⊕ b = b ⊕ a
///
/// Where ⊕ represents the merge operation.
pub trait Mergeable: Sized + Send {
    /// Merge another stats object into this one
    ///
    /// After merging, `self` should contain the combined statistics
    /// as if both datasets were processed together.
    fn merge(&mut self, other: Self);

    /// Merge multiple stats objects into one (convenience method)
    fn merge_all(stats: Vec<Self>) -> Option<Self> {
        let mut iter = stats.into_iter();
        let mut result = iter.next()?;
        for stat in iter {
            result.merge(stat);
        }
        Some(result)
    }
}

/// Configuration for parallel computation
#[derive(Debug, Clone)]
pub struct ParallelConfig {
    /// Number of threads to use (default: number of CPUs)
    pub num_threads: Option<usize>,

    /// Minimum chunk size in bytes (default: 5 MB)
    /// Smaller chunks = better load balancing and cache efficiency
    /// Benchmarks show 1-5 MB is optimal for most workloads
    pub min_chunk_size: usize,

    /// Maximum number of chunks (default: num_threads * 4)
    /// Limits memory usage for very large files
    pub max_chunks: Option<usize>,
}

impl Default for ParallelConfig {
    fn default() -> Self {
        Self {
            num_threads: None, // Use all CPUs
            min_chunk_size: 5 * 1024 * 1024, // 5 MB (optimal for cache efficiency)
            max_chunks: None,
        }
    }
}

impl ParallelConfig {
    /// Create a new config with all defaults
    pub fn new() -> Self {
        Self::default()
    }

    /// Set the number of threads to use
    pub fn with_threads(mut self, threads: usize) -> Self {
        self.num_threads = Some(threads);
        self
    }

    /// Set the minimum chunk size in bytes
    pub fn with_chunk_size(mut self, size: usize) -> Self {
        self.min_chunk_size = size;
        self
    }

    /// Set the maximum number of chunks
    pub fn with_max_chunks(mut self, max: usize) -> Self {
        self.max_chunks = Some(max);
        self
    }

    /// Get the effective number of threads
    pub fn threads(&self) -> usize {
        self.num_threads.unwrap_or_else(num_cpus::get)
    }

    /// Get the effective max chunks
    pub fn max_chunks(&self) -> usize {
        self.max_chunks.unwrap_or_else(|| self.threads() * 4)
    }
}


lazy_static! {
    static ref THREAD_POOL_CACHE: Mutex<HashMap<usize, Arc<ThreadPool>>> = Mutex::new(HashMap::new());
}

/// Get or create a thread pool with the specified number of threads
///
/// Thread pools are cached globally to avoid creation overhead on repeated calls.
/// Creating a thread pool can take 10-40ms depending on the thread count, so
/// caching provides significant performance benefits.
///
/// # Arguments
/// * `num_threads` - Number of threads for the pool (must be > 0)
///
/// # Returns
/// A reference-counted handle to the thread pool
pub fn get_thread_pool(num_threads: usize) -> Result<Arc<ThreadPool>> {
    let mut cache = THREAD_POOL_CACHE
        .lock()
        .map_err(|e| crate::error::Error::InvalidInput(format!("Thread pool cache poisoned: {}", e)))?;

    // Check if we already have a pool for this thread count
    if let Some(pool) = cache.get(&num_threads) {
        // Clone the pool handle (cheap, just Arc increment)
        return Ok(Arc::clone(pool));
    }

    // Create new pool
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(num_threads)
        .build()
        .map_err(|e| crate::error::Error::InvalidInput(format!("Failed to create thread pool: {}", e)))?;

    // Cache it
    let cached_pool = Arc::new(pool);
    cache.insert(num_threads, Arc::clone(&cached_pool));

    Ok(cached_pool)
}

/// Represents a byte range in a file for parallel processing
#[derive(Debug, Clone, Copy)]
pub struct FileChunk {
    /// Starting byte offset
    pub start: u64,
    /// Ending byte offset (exclusive)
    pub end: u64,
    /// Chunk index (for debugging)
    pub index: usize,
}

impl FileChunk {
    /// Create a new file chunk
    pub fn new(start: u64, end: u64, index: usize) -> Self {
        Self { start, end, index }
    }

    /// Get the size of this chunk in bytes
    pub fn size(&self) -> u64 {
        self.end - self.start
    }
}

/// Calculate file chunks for parallel processing
///
/// Divides a file into roughly equal-sized chunks, respecting minimum size
/// and maximum count constraints.
pub fn calculate_chunks(file_size: u64, config: &ParallelConfig) -> Vec<FileChunk> {
    if file_size == 0 {
        return vec![];
    }

    let num_threads = config.threads();
    let min_chunk_size = config.min_chunk_size as u64;
    let max_chunks = config.max_chunks();

    // Determine number of chunks
    let ideal_chunks = num_threads;
    let max_chunks_by_size = (file_size / min_chunk_size).max(1) as usize;
    let num_chunks = ideal_chunks.min(max_chunks_by_size).min(max_chunks);

    // Calculate chunk size
    let chunk_size = file_size / num_chunks as u64;

    // Create chunks
    (0..num_chunks)
        .map(|i| {
            let start = i as u64 * chunk_size;
            let end = if i == num_chunks - 1 {
                file_size // Last chunk gets remainder
            } else {
                (i + 1) as u64 * chunk_size
            };
            FileChunk::new(start, end, i)
        })
        .collect()
}


/// Find the start of the next complete record after a byte offset
///
/// This is critical for parallel processing: each thread must start
/// at a record boundary, not in the middle of a record.
///
/// # Strategy
/// 1. Seek to the chunk start position
/// 2. If not at file start, scan forward until we find a line starting with '@'
/// 3. For FASTQ specifically, we need to find the '@' that starts a record
///
/// # Returns
/// - `Ok(Some(offset))` - Position of the next record boundary
/// - `Ok(None)` - No more records (hit EOF)
/// - `Err(_)` - I/O error
///
/// # Note
/// This is a simple implementation that works for FASTQ. For other formats,
/// you may need format-specific boundary detection.
pub fn find_record_boundary<R: BufRead + Seek>(reader: &mut R, offset: u64) -> Result<Option<u64>> {
    // Special case: start of file is always a record boundary
    if offset == 0 {
        return Ok(Some(0));
    }

    // Seek to the offset
    reader.seek(SeekFrom::Start(offset))?;

    // Skip to end of current line (we're likely in the middle of a line)
    let mut discard = Vec::new();
    let first_skip = reader.read_until(b'\n', &mut discard)?;
    if first_skip == 0 {
        return Ok(None); // EOF
    }

    let mut current_pos = offset + first_skip as u64;

    // Now scan forward looking for a line that starts with '@'
    // (FASTQ record start marker)
    loop {
        let mut line = Vec::new();
        let bytes_read = reader.read_until(b'\n', &mut line)?;

        if bytes_read == 0 {
            // Hit EOF without finding a record start
            return Ok(None);
        }

        // Check if this line starts with '@' (FASTQ record header)
        if !line.is_empty() && line[0] == b'@' {
            // Found a record boundary!
            return Ok(Some(current_pos));
        }

        current_pos += bytes_read as u64;
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::io::Cursor;

    #[test]
    fn test_parallel_config_defaults() {
        let config = ParallelConfig::default();
        assert!(config.threads() > 0);
        assert_eq!(config.min_chunk_size, 5 * 1024 * 1024);
    }

    #[test]
    fn test_parallel_config_builder() {
        let config = ParallelConfig::new()
            .with_threads(4)
            .with_chunk_size(5_000_000)
            .with_max_chunks(16);

        assert_eq!(config.threads(), 4);
        assert_eq!(config.min_chunk_size, 5_000_000);
        assert_eq!(config.max_chunks(), 16);
    }

    #[test]
    fn test_calculate_chunks_small_file() {
        let config = ParallelConfig::new().with_threads(4);

        // 1 MB file, 10 MB min chunk size -> 1 chunk
        let chunks = calculate_chunks(1_000_000, &config);
        assert_eq!(chunks.len(), 1);
        assert_eq!(chunks[0].start, 0);
        assert_eq!(chunks[0].end, 1_000_000);
    }

    #[test]
    fn test_calculate_chunks_large_file() {
        let config = ParallelConfig::new()
            .with_threads(4)
            .with_chunk_size(10_000_000);

        // 100 MB file, 10 MB min chunk -> 4 chunks (one per thread)
        let chunks = calculate_chunks(100_000_000, &config);
        assert_eq!(chunks.len(), 4);

        // Each chunk should be ~25 MB
        for (i, chunk) in chunks.iter().enumerate() {
            assert_eq!(chunk.index, i);
            if i < 3 {
                assert_eq!(chunk.size(), 25_000_000);
            } else {
                // Last chunk gets remainder
                assert_eq!(chunk.size(), 25_000_000);
            }
        }
    }

    #[test]
    fn test_calculate_chunks_empty_file() {
        let config = ParallelConfig::default();
        let chunks = calculate_chunks(0, &config);
        assert_eq!(chunks.len(), 0);
    }

    #[test]
    fn test_find_record_boundary_at_start() {
        let data = b"@READ1\nACGT\n+\nIIII\n@READ2\nGGGG\n+\nIIII\n";
        let mut cursor = Cursor::new(data);

        let boundary = find_record_boundary(&mut cursor, 0).unwrap();
        assert_eq!(boundary, Some(0));
    }

    #[test]
    fn test_find_record_boundary_mid_record() {
        let data = b"@READ1\nACGT\n+\nIIII\n@READ2\nGGGG\n+\nIIII\n";
        let mut cursor = Cursor::new(data);

        // Seek to position 10 (in the middle of first record)
        let boundary = find_record_boundary(&mut cursor, 10).unwrap();

        // Should skip to after the next newline
        assert!(boundary.unwrap() > 10);
        assert!(boundary.unwrap() <= data.len() as u64);
    }

    #[test]
    fn test_find_record_boundary_at_eof() {
        let data = b"@READ1\nACGT\n+\nIIII\n";
        let mut cursor = Cursor::new(data);

        let boundary = find_record_boundary(&mut cursor, data.len() as u64).unwrap();
        assert_eq!(boundary, None); // EOF
    }

    #[test]
    fn test_file_chunk_size() {
        let chunk = FileChunk::new(0, 1000, 0);
        assert_eq!(chunk.size(), 1000);

        let chunk = FileChunk::new(500, 1500, 1);
        assert_eq!(chunk.size(), 1000);
    }
}