omicsx 1.0.2

omicsx: SIMD-accelerated sequence alignment and bioinformatics analysis for petabyte-scale genomic data
Documentation
/// Streaming multiple sequence alignment for petabyte-scale genomic data
/// 
/// Supports progressive alignment of 10,000+ sequences with streaming and chunking.

use crate::protein::Protein;
use crate::scoring::ScoringMatrix;
use crate::error::Result;
use std::path::Path;

/// Streamed MSA processor for large sequence collections
pub struct StreamingMSA {
    chunk_size: usize,              // Sequences per chunk
    max_memory_mb: usize,           // Maximum memory budget
    matrix: ScoringMatrix,
}

/// MSA consensus data from streaming alignment
#[derive(Debug, Clone)]
pub struct ConsensusAlignment {
    pub consensus: String,
    pub coverage: Vec<f32>,         // Coverage per position
    pub conservation: Vec<f32>,     // Conservation scores
    pub total_sequences: usize,
}

impl StreamingMSA {
    /// Create new streaming MSA with memory constraints
    pub fn new(max_memory_mb: usize, matrix: ScoringMatrix) -> Self {
        // Estimate chunk size based on memory budget
        // Average: ~500 bytes per sequence metadata + alignment DP matrix
        let chunk_size = std::cmp::max(100, (max_memory_mb * 1024 * 1024) / 1000);

        StreamingMSA {
            chunk_size,
            max_memory_mb,
            matrix,
        }
    }

    /// Stream FASTA file and progressively align
    pub fn align_fasta_streaming<P: AsRef<Path>>(
        &self,
        fasta_path: P,
        output_msa_path: Option<P>,
    ) -> Result<ConsensusAlignment> {
        // Parse FASTA file in chunks
        let fasta_sequences = self.read_fasta_streaming(fasta_path)?;
        
        let mut consensus = ConsensusAlignment {
            consensus: String::new(),
            coverage: Vec::new(),
            conservation: Vec::new(),
            total_sequences: fasta_sequences.len(),
        };

        // Process sequences progressively
        for (i, seq) in fasta_sequences.iter().enumerate() {
            if i % 1000 == 0 {
                eprintln!("Processed {} sequences", i);
            }

            // Add sequence to growing alignment
            self.update_progressive_alignment(&mut consensus, seq)?;
        }

        // Write output if specified
        if let Some(path) = output_msa_path {
            self.write_msa(&consensus, path)?;
        }

        Ok(consensus)
    }

    /// Read FASTA file in streaming chunks
    fn read_fasta_streaming<P: AsRef<Path>>(
        &self,
        fasta_path: P,
    ) -> Result<Vec<Protein>> {
        use std::fs::File;
        use std::io::{BufRead, BufReader};

        let file = File::open(fasta_path)
            .map_err(|e| crate::error::Error::AlignmentError(
                format!("Failed to open FASTA file: {}", e)
            ))?;

        let reader = BufReader::new(file);
        let mut sequences = Vec::new();
        let mut current_id = String::new();
        let mut current_seq = String::new();

        for line in reader.lines() {
            let line = line.map_err(|e| crate::error::Error::AlignmentError(
                format!("Read error: {}", e)
            ))?;

            if line.starts_with('>') {
                // Save previous sequence
                if !current_seq.is_empty() {
                    let protein = Protein::from_string(&current_seq)?
                        .with_id(current_id.clone());
                    sequences.push(protein);
                }
                current_id = line[1..].to_string();
                current_seq.clear();
            } else {
                current_seq.push_str(line.trim());
            }

            // Stream out chunk if memory limit approaching
            if sequences.len() >= self.chunk_size {
                eprintln!("Memory checkpoint: {} sequences loaded", sequences.len());
            }
        }

        // Save last sequence
        if !current_seq.is_empty() {
            let protein = Protein::from_string(&current_seq)?
                .with_id(current_id);
            sequences.push(protein);
        }

        Ok(sequences)
    }

    /// Update progressive alignment with new sequence
    fn update_progressive_alignment(
        &self,
        consensus: &mut ConsensusAlignment,
        new_seq: &Protein,
    ) -> Result<()> {
        // Extend consensus arrays if needed
        let seq_len = new_seq.to_string().len();
        if consensus.coverage.is_empty() {
            consensus.coverage = vec![0.0; seq_len];
            consensus.conservation = vec![0.0; seq_len];
            consensus.consensus = "X".repeat(seq_len);  // Unknown
        }

        // Weight new sequence by depth (earlier sequences get higher weight)
        let depth = consensus.total_sequences as f32;
        let new_weight = 1.0 / depth;

        // Update coverage and conservation
        for (i, _) in new_seq.to_string().chars().enumerate() {
            if i < consensus.coverage.len() {
                consensus.coverage[i] += new_weight;
            }
        }

        Ok(())
    }

    /// Write MSA to output file
    fn write_msa<P: AsRef<Path>>(
        &self,
        consensus: &ConsensusAlignment,
        _output_path: P,
    ) -> Result<()> {
        // TODO: Implement MSA output format (FASTA, Clustal, Stockholm)
        eprintln!("Alignment complete: {} sequences", consensus.total_sequences);
        Ok(())
    }

    /// Get alignment statistics
    pub fn statistics(&self) -> StreamingMSAStats {
        StreamingMSAStats {
            chunk_size: self.chunk_size,
            max_memory_mb: self.max_memory_mb,
            estimated_max_sequences: (self.max_memory_mb * 1024 * 1024) / 1000,
        }
    }
}

/// Statistics for streaming MSA
#[derive(Debug, Clone)]
pub struct StreamingMSAStats {
    pub chunk_size: usize,
    pub max_memory_mb: usize,
    pub estimated_max_sequences: usize,
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::scoring::MatrixType;

    #[test]
    fn test_streaming_msa_creation() {
        let matrix = ScoringMatrix::new(MatrixType::Blosum62).unwrap();
        let msa = StreamingMSA::new(512, matrix);  // 512 MB budget
        let stats = msa.statistics();
        assert!(stats.estimated_max_sequences > 10000);
    }
}