use crate::protein::Protein;
use crate::scoring::ScoringMatrix;
use crate::error::Result;
use std::path::Path;
pub struct StreamingMSA {
chunk_size: usize, max_memory_mb: usize, matrix: ScoringMatrix,
}
#[derive(Debug, Clone)]
pub struct ConsensusAlignment {
pub consensus: String,
pub coverage: Vec<f32>, pub conservation: Vec<f32>, pub total_sequences: usize,
}
impl StreamingMSA {
pub fn new(max_memory_mb: usize, matrix: ScoringMatrix) -> Self {
let chunk_size = std::cmp::max(100, (max_memory_mb * 1024 * 1024) / 1000);
StreamingMSA {
chunk_size,
max_memory_mb,
matrix,
}
}
pub fn align_fasta_streaming<P: AsRef<Path>>(
&self,
fasta_path: P,
output_msa_path: Option<P>,
) -> Result<ConsensusAlignment> {
let fasta_sequences = self.read_fasta_streaming(fasta_path)?;
let mut consensus = ConsensusAlignment {
consensus: String::new(),
coverage: Vec::new(),
conservation: Vec::new(),
total_sequences: fasta_sequences.len(),
};
for (i, seq) in fasta_sequences.iter().enumerate() {
if i % 1000 == 0 {
eprintln!("Processed {} sequences", i);
}
self.update_progressive_alignment(&mut consensus, seq)?;
}
if let Some(path) = output_msa_path {
self.write_msa(&consensus, path)?;
}
Ok(consensus)
}
fn read_fasta_streaming<P: AsRef<Path>>(
&self,
fasta_path: P,
) -> Result<Vec<Protein>> {
use std::fs::File;
use std::io::{BufRead, BufReader};
let file = File::open(fasta_path)
.map_err(|e| crate::error::Error::AlignmentError(
format!("Failed to open FASTA file: {}", e)
))?;
let reader = BufReader::new(file);
let mut sequences = Vec::new();
let mut current_id = String::new();
let mut current_seq = String::new();
for line in reader.lines() {
let line = line.map_err(|e| crate::error::Error::AlignmentError(
format!("Read error: {}", e)
))?;
if line.starts_with('>') {
if !current_seq.is_empty() {
let protein = Protein::from_string(¤t_seq)?
.with_id(current_id.clone());
sequences.push(protein);
}
current_id = line[1..].to_string();
current_seq.clear();
} else {
current_seq.push_str(line.trim());
}
if sequences.len() >= self.chunk_size {
eprintln!("Memory checkpoint: {} sequences loaded", sequences.len());
}
}
if !current_seq.is_empty() {
let protein = Protein::from_string(¤t_seq)?
.with_id(current_id);
sequences.push(protein);
}
Ok(sequences)
}
fn update_progressive_alignment(
&self,
consensus: &mut ConsensusAlignment,
new_seq: &Protein,
) -> Result<()> {
let seq_len = new_seq.to_string().len();
if consensus.coverage.is_empty() {
consensus.coverage = vec![0.0; seq_len];
consensus.conservation = vec![0.0; seq_len];
consensus.consensus = "X".repeat(seq_len); }
let depth = consensus.total_sequences as f32;
let new_weight = 1.0 / depth;
for (i, _) in new_seq.to_string().chars().enumerate() {
if i < consensus.coverage.len() {
consensus.coverage[i] += new_weight;
}
}
Ok(())
}
fn write_msa<P: AsRef<Path>>(
&self,
consensus: &ConsensusAlignment,
_output_path: P,
) -> Result<()> {
eprintln!("Alignment complete: {} sequences", consensus.total_sequences);
Ok(())
}
pub fn statistics(&self) -> StreamingMSAStats {
StreamingMSAStats {
chunk_size: self.chunk_size,
max_memory_mb: self.max_memory_mb,
estimated_max_sequences: (self.max_memory_mb * 1024 * 1024) / 1000,
}
}
}
#[derive(Debug, Clone)]
pub struct StreamingMSAStats {
pub chunk_size: usize,
pub max_memory_mb: usize,
pub estimated_max_sequences: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::scoring::MatrixType;
#[test]
fn test_streaming_msa_creation() {
let matrix = ScoringMatrix::new(MatrixType::Blosum62).unwrap();
let msa = StreamingMSA::new(512, matrix); let stats = msa.statistics();
assert!(stats.estimated_max_sequences > 10000);
}
}