use md5::Md5;
use sha2::{Digest, Sha512};
use std::io::{self, Write};
use super::alphabet::AlphabetGuesser;
use super::fasta::parse_fasta_header;
use super::types::{
SequenceCollection, SequenceCollectionMetadata, SequenceMetadata, SequenceRecord,
};
#[derive(Clone, Copy, Debug, PartialEq)]
enum ParserState {
AwaitingHeader,
InSequence,
}
struct FastaProcessor {
state: ParserState,
line_buffer: Vec<u8>,
current_name: Option<String>,
current_description: Option<String>,
current_length: usize,
sha512_hasher: Sha512,
md5_hasher: Md5,
alphabet_guesser: AlphabetGuesser,
sequences: Vec<SequenceRecord>,
processing_error: Option<String>,
}
impl FastaProcessor {
fn new() -> Self {
Self {
state: ParserState::AwaitingHeader,
line_buffer: Vec::with_capacity(8192),
current_name: None,
current_description: None,
current_length: 0,
sha512_hasher: Sha512::new(),
md5_hasher: Md5::new(),
alphabet_guesser: AlphabetGuesser::new(),
sequences: Vec::new(),
processing_error: None,
}
}
fn process_byte(&mut self, byte: u8) {
if self.processing_error.is_some() {
return;
}
if byte == b'\n' || byte == b'\r' {
if let Err(e) = self.process_line() {
self.processing_error = Some(e.to_string());
}
self.line_buffer.clear();
} else {
self.line_buffer.push(byte);
}
}
fn process_line(&mut self) -> anyhow::Result<()> {
if self.line_buffer.is_empty() {
return Ok(());
}
if self.line_buffer[0] == b'>' {
if self.current_name.is_some() {
self.finalize_current_sequence();
}
let header = std::str::from_utf8(&self.line_buffer[1..])
.map_err(|e| anyhow::anyhow!("Invalid UTF-8 in header: {}", e))?;
let (name, description) = parse_fasta_header(header);
self.current_name = Some(name);
self.current_description = description;
self.current_length = 0;
self.sha512_hasher = Sha512::new();
self.md5_hasher = Md5::new();
self.alphabet_guesser = AlphabetGuesser::new();
self.state = ParserState::InSequence;
} else if self.state == ParserState::InSequence && self.current_name.is_some() {
let uppercased: Vec<u8> = self
.line_buffer
.iter()
.filter(|&&b| !b.is_ascii_whitespace())
.map(|b| b.to_ascii_uppercase())
.collect();
if !uppercased.is_empty() {
self.sha512_hasher.update(&uppercased);
self.md5_hasher.update(&uppercased);
self.alphabet_guesser.update(&uppercased);
self.current_length += uppercased.len();
}
}
Ok(())
}
fn finalize_current_sequence(&mut self) {
if let Some(name) = self.current_name.take() {
let sha512 = base64_url::encode(&self.sha512_hasher.clone().finalize()[0..24]);
let md5 = format!("{:x}", self.md5_hasher.clone().finalize());
let alphabet = self.alphabet_guesser.guess();
let metadata = SequenceMetadata {
name,
description: self.current_description.take(),
length: self.current_length,
sha512t24u: sha512,
md5,
alphabet,
fai: None,
};
self.sequences.push(SequenceRecord::Stub(metadata));
}
}
fn finish(mut self) -> anyhow::Result<SequenceCollection> {
if let Some(err) = self.processing_error {
return Err(anyhow::anyhow!("Processing error: {}", err));
}
if !self.line_buffer.is_empty() {
self.process_line()?;
}
if self.current_name.is_some() {
self.finalize_current_sequence();
}
let metadata = SequenceCollectionMetadata::from_sequences(&self.sequences, None);
Ok(SequenceCollection {
metadata,
sequences: self.sequences,
})
}
}
impl Write for FastaProcessor {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
for &byte in buf {
self.process_byte(byte);
if let Some(ref err) = self.processing_error {
return Err(io::Error::new(io::ErrorKind::InvalidData, err.clone()));
}
}
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
enum ProcessorState {
Detecting,
Plain(FastaProcessor),
Gzipped(flate2::write::GzDecoder<FastaProcessor>),
}
pub struct FastaStreamHasher {
state: ProcessorState,
}
impl FastaStreamHasher {
pub fn new() -> Self {
Self {
state: ProcessorState::Detecting,
}
}
pub fn update(&mut self, chunk: &[u8]) -> anyhow::Result<()> {
if chunk.is_empty() {
return Ok(());
}
if matches!(self.state, ProcessorState::Detecting) {
let is_gzipped = chunk.len() >= 2 && chunk[0] == 0x1f && chunk[1] == 0x8b;
if is_gzipped {
let processor = FastaProcessor::new();
let decoder = flate2::write::GzDecoder::new(processor);
self.state = ProcessorState::Gzipped(decoder);
} else {
self.state = ProcessorState::Plain(FastaProcessor::new());
}
}
match &mut self.state {
ProcessorState::Detecting => unreachable!(),
ProcessorState::Plain(processor) => {
processor.write_all(chunk)?;
}
ProcessorState::Gzipped(decoder) => {
decoder.write_all(chunk)?;
}
}
Ok(())
}
pub fn finish(self) -> anyhow::Result<SequenceCollection> {
match self.state {
ProcessorState::Detecting => {
let metadata = SequenceCollectionMetadata::from_sequences(&[], None);
Ok(SequenceCollection {
metadata,
sequences: Vec::new(),
})
}
ProcessorState::Plain(processor) => processor.finish(),
ProcessorState::Gzipped(decoder) => {
let processor = decoder
.finish()
.map_err(|e| anyhow::anyhow!("Gzip decompression error: {}", e))?;
processor.finish()
}
}
}
pub fn sequence_count(&self) -> usize {
match &self.state {
ProcessorState::Detecting => 0,
ProcessorState::Plain(p) => p.sequences.len(),
ProcessorState::Gzipped(d) => d.get_ref().sequences.len(),
}
}
pub fn in_sequence(&self) -> bool {
match &self.state {
ProcessorState::Detecting => false,
ProcessorState::Plain(p) => p.current_name.is_some(),
ProcessorState::Gzipped(d) => d.get_ref().current_name.is_some(),
}
}
pub fn current_sequence_name(&self) -> Option<&str> {
match &self.state {
ProcessorState::Detecting => None,
ProcessorState::Plain(p) => p.current_name.as_deref(),
ProcessorState::Gzipped(d) => d.get_ref().current_name.as_deref(),
}
}
pub fn current_sequence_length(&self) -> usize {
match &self.state {
ProcessorState::Detecting => 0,
ProcessorState::Plain(p) => p.current_length,
ProcessorState::Gzipped(d) => d.get_ref().current_length,
}
}
}
impl Default for FastaStreamHasher {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::digest::alphabet::AlphabetType;
use crate::digest::fasta::digest_fasta_bytes;
#[test]
fn test_streaming_basic() {
let mut hasher = FastaStreamHasher::new();
hasher
.update(b">chr1\nACGT\n>chr2\nTGCA\n")
.expect("update");
let collection = hasher.finish().expect("finish");
assert_eq!(collection.sequences.len(), 2);
assert_eq!(collection.sequences[0].metadata().name, "chr1");
assert_eq!(collection.sequences[0].metadata().length, 4);
assert_eq!(collection.sequences[1].metadata().name, "chr2");
assert_eq!(collection.sequences[1].metadata().length, 4);
}
#[test]
fn test_streaming_chunked() {
let mut hasher = FastaStreamHasher::new();
hasher.update(b">chr1\nAC").expect("chunk 1");
hasher.update(b"GT\n>chr2\n").expect("chunk 2");
hasher.update(b"TGCA\n").expect("chunk 3");
let collection = hasher.finish().expect("finish");
assert_eq!(collection.sequences.len(), 2);
assert_eq!(collection.sequences[0].metadata().name, "chr1");
assert_eq!(collection.sequences[0].metadata().length, 4);
assert_eq!(collection.sequences[1].metadata().name, "chr2");
assert_eq!(collection.sequences[1].metadata().length, 4);
}
#[test]
fn test_streaming_split_header() {
let mut hasher = FastaStreamHasher::new();
hasher.update(b">ch").expect("chunk 1");
hasher.update(b"r1 description\nACGT\n").expect("chunk 2");
let collection = hasher.finish().expect("finish");
assert_eq!(collection.sequences.len(), 1);
assert_eq!(collection.sequences[0].metadata().name, "chr1");
assert_eq!(
collection.sequences[0].metadata().description,
Some("description".to_string())
);
}
#[test]
fn test_streaming_matches_batch() {
let fasta_data = b">chrX\nTTGGGGAA\n>chr1\nGGAA\n>chr2\nGCGC\n";
let batch_result = digest_fasta_bytes(fasta_data).expect("batch");
let mut hasher = FastaStreamHasher::new();
hasher.update(fasta_data).expect("streaming");
let stream_result = hasher.finish().expect("finish");
assert_eq!(batch_result.metadata.digest, stream_result.metadata.digest);
assert_eq!(
batch_result.metadata.names_digest,
stream_result.metadata.names_digest
);
assert_eq!(
batch_result.metadata.sequences_digest,
stream_result.metadata.sequences_digest
);
assert_eq!(
batch_result.metadata.lengths_digest,
stream_result.metadata.lengths_digest
);
for (batch_seq, stream_seq) in batch_result
.sequences
.iter()
.zip(stream_result.sequences.iter())
{
assert_eq!(batch_seq.metadata().name, stream_seq.metadata().name);
assert_eq!(batch_seq.metadata().length, stream_seq.metadata().length);
assert_eq!(
batch_seq.metadata().sha512t24u,
stream_seq.metadata().sha512t24u
);
assert_eq!(batch_seq.metadata().md5, stream_seq.metadata().md5);
assert_eq!(
batch_seq.metadata().alphabet,
stream_seq.metadata().alphabet
);
}
}
#[test]
fn test_streaming_multiline_sequence() {
let mut hasher = FastaStreamHasher::new();
hasher.update(b">chr1\nACGT\nTGCA\nAAAA\n").expect("update");
let collection = hasher.finish().expect("finish");
assert_eq!(collection.sequences.len(), 1);
assert_eq!(collection.sequences[0].metadata().length, 12);
}
#[test]
fn test_streaming_empty() {
let hasher = FastaStreamHasher::new();
let collection = hasher.finish().expect("finish");
assert_eq!(collection.sequences.len(), 0);
}
#[test]
fn test_streaming_known_digest() {
let mut hasher = FastaStreamHasher::new();
hasher.update(b">chrX\nTTGGGGAA\n").expect("update");
let collection = hasher.finish().expect("finish");
assert_eq!(
collection.sequences[0].metadata().sha512t24u,
"iYtREV555dUFKg2_agSJW6suquUyPpMw"
);
assert_eq!(
collection.sequences[0].metadata().md5,
"5f63cfaa3ef61f88c9635fb9d18ec945"
);
assert_eq!(
collection.sequences[0].metadata().alphabet,
AlphabetType::Dna2bit
);
}
#[test]
fn test_streaming_gzipped() {
use flate2::Compression;
use flate2::write::GzEncoder;
let fasta = b">chr1\nACGT\n";
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(fasta).expect("compress");
let compressed = encoder.finish().expect("finish compression");
let mut hasher = FastaStreamHasher::new();
hasher.update(&compressed).expect("update");
let collection = hasher.finish().expect("finish");
assert_eq!(collection.sequences.len(), 1);
assert_eq!(collection.sequences[0].metadata().name, "chr1");
assert_eq!(collection.sequences[0].metadata().length, 4);
}
#[test]
fn test_streaming_gzipped_chunked() {
use flate2::Compression;
use flate2::write::GzEncoder;
let fasta = b">chr1\nACGTTGCA\n>chr2\nGGGGAAAA\n";
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(fasta).expect("compress");
let compressed = encoder.finish().expect("finish compression");
let mut hasher = FastaStreamHasher::new();
for chunk in compressed.chunks(5) {
hasher.update(chunk).expect("update chunk");
}
let collection = hasher.finish().expect("finish");
assert_eq!(collection.sequences.len(), 2);
assert_eq!(collection.sequences[0].metadata().name, "chr1");
assert_eq!(collection.sequences[0].metadata().length, 8);
assert_eq!(collection.sequences[1].metadata().name, "chr2");
assert_eq!(collection.sequences[1].metadata().length, 8);
}
#[test]
fn test_streaming_progress() {
let mut hasher = FastaStreamHasher::new();
assert_eq!(hasher.sequence_count(), 0);
assert!(!hasher.in_sequence());
assert!(hasher.current_sequence_name().is_none());
hasher.update(b">chr1\n").expect("header");
assert!(hasher.in_sequence());
assert_eq!(hasher.current_sequence_name(), Some("chr1"));
assert_eq!(hasher.current_sequence_length(), 0);
hasher.update(b"ACGT\n").expect("sequence");
assert_eq!(hasher.current_sequence_length(), 4);
hasher.update(b">chr2\n").expect("next header");
assert_eq!(hasher.sequence_count(), 1); assert_eq!(hasher.current_sequence_name(), Some("chr2"));
}
#[test]
fn test_streaming_chunked_matches_batch() {
let fasta_data = b">chrX\nTTGGGGAA\n>chr1\nGGAA\n>chr2\nGCGC\n";
let batch_result = digest_fasta_bytes(fasta_data).expect("batch");
for chunk_size in [1, 2, 3, 5, 7, 11, 13, 17] {
let mut hasher = FastaStreamHasher::new();
for chunk in fasta_data.chunks(chunk_size) {
hasher.update(chunk).expect("streaming chunk");
}
let stream_result = hasher.finish().expect("finish");
assert_eq!(
batch_result.metadata.digest, stream_result.metadata.digest,
"Mismatch with chunk size {}",
chunk_size
);
}
}
}