1use md5::Md5;
8use sha2::{Digest, Sha512};
9use std::io::{self, Write};
10
11use super::alphabet::AlphabetGuesser;
12use super::fasta::parse_fasta_header;
13use super::types::{
14 SequenceCollection, SequenceCollectionMetadata, SequenceMetadata, SequenceRecord,
15};
16
17#[derive(Clone, Copy, Debug, PartialEq)]
19enum ParserState {
20 AwaitingHeader,
22 InSequence,
24}
25
26struct FastaProcessor {
29 state: ParserState,
30 line_buffer: Vec<u8>,
31 current_name: Option<String>,
32 current_description: Option<String>,
33 current_length: usize,
34 sha512_hasher: Sha512,
35 md5_hasher: Md5,
36 alphabet_guesser: AlphabetGuesser,
37 sequences: Vec<SequenceRecord>,
38 processing_error: Option<String>,
40}
41
42impl FastaProcessor {
43 fn new() -> Self {
44 Self {
45 state: ParserState::AwaitingHeader,
46 line_buffer: Vec::with_capacity(8192),
47 current_name: None,
48 current_description: None,
49 current_length: 0,
50 sha512_hasher: Sha512::new(),
51 md5_hasher: Md5::new(),
52 alphabet_guesser: AlphabetGuesser::new(),
53 sequences: Vec::new(),
54 processing_error: None,
55 }
56 }
57
58 fn process_byte(&mut self, byte: u8) {
59 if self.processing_error.is_some() {
61 return;
62 }
63
64 if byte == b'\n' || byte == b'\r' {
65 if let Err(e) = self.process_line() {
66 self.processing_error = Some(e.to_string());
67 }
68 self.line_buffer.clear();
69 } else {
70 self.line_buffer.push(byte);
71 }
72 }
73
74 fn process_line(&mut self) -> anyhow::Result<()> {
75 if self.line_buffer.is_empty() {
76 return Ok(());
77 }
78
79 if self.line_buffer[0] == b'>' {
80 if self.current_name.is_some() {
82 self.finalize_current_sequence();
83 }
84
85 let header = std::str::from_utf8(&self.line_buffer[1..])
87 .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in header: {}", e))?;
88 let (name, description) = parse_fasta_header(header);
89
90 self.current_name = Some(name);
92 self.current_description = description;
93 self.current_length = 0;
94 self.sha512_hasher = Sha512::new();
95 self.md5_hasher = Md5::new();
96 self.alphabet_guesser = AlphabetGuesser::new();
97 self.state = ParserState::InSequence;
98 } else if self.state == ParserState::InSequence && self.current_name.is_some() {
99 let uppercased: Vec<u8> = self
101 .line_buffer
102 .iter()
103 .filter(|&&b| !b.is_ascii_whitespace())
104 .map(|b| b.to_ascii_uppercase())
105 .collect();
106
107 if !uppercased.is_empty() {
108 self.sha512_hasher.update(&uppercased);
109 self.md5_hasher.update(&uppercased);
110 self.alphabet_guesser.update(&uppercased);
111 self.current_length += uppercased.len();
112 }
113 }
114
115 Ok(())
116 }
117
118 fn finalize_current_sequence(&mut self) {
119 if let Some(name) = self.current_name.take() {
120 let sha512 = base64_url::encode(&self.sha512_hasher.clone().finalize()[0..24]);
121 let md5 = format!("{:x}", self.md5_hasher.clone().finalize());
122 let alphabet = self.alphabet_guesser.guess();
123
124 let metadata = SequenceMetadata {
125 name,
126 description: self.current_description.take(),
127 length: self.current_length,
128 sha512t24u: sha512,
129 md5,
130 alphabet,
131 fai: None,
132 };
133
134 self.sequences.push(SequenceRecord::Stub(metadata));
135 }
136 }
137
138 fn finish(mut self) -> anyhow::Result<SequenceCollection> {
139 if let Some(err) = self.processing_error {
141 return Err(anyhow::anyhow!("Processing error: {}", err));
142 }
143
144 if !self.line_buffer.is_empty() {
146 self.process_line()?;
147 }
148
149 if self.current_name.is_some() {
151 self.finalize_current_sequence();
152 }
153
154 let metadata = SequenceCollectionMetadata::from_sequences(&self.sequences, None);
156
157 Ok(SequenceCollection {
158 metadata,
159 sequences: self.sequences,
160 })
161 }
162}
163
164impl Write for FastaProcessor {
165 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
166 for &byte in buf {
167 self.process_byte(byte);
168 if let Some(ref err) = self.processing_error {
170 return Err(io::Error::new(io::ErrorKind::InvalidData, err.clone()));
171 }
172 }
173 Ok(buf.len())
174 }
175
176 fn flush(&mut self) -> io::Result<()> {
177 Ok(())
178 }
179}
180
181enum ProcessorState {
183 Detecting,
185 Plain(FastaProcessor),
187 Gzipped(flate2::write::GzDecoder<FastaProcessor>),
189}
190
191pub struct FastaStreamHasher {
217 state: ProcessorState,
218}
219
220impl FastaStreamHasher {
221 pub fn new() -> Self {
223 Self {
224 state: ProcessorState::Detecting,
225 }
226 }
227
228 pub fn update(&mut self, chunk: &[u8]) -> anyhow::Result<()> {
240 if chunk.is_empty() {
241 return Ok(());
242 }
243
244 if matches!(self.state, ProcessorState::Detecting) {
246 let is_gzipped = chunk.len() >= 2 && chunk[0] == 0x1f && chunk[1] == 0x8b;
247
248 if is_gzipped {
249 let processor = FastaProcessor::new();
252 let decoder = flate2::write::GzDecoder::new(processor);
253 self.state = ProcessorState::Gzipped(decoder);
254 } else {
255 self.state = ProcessorState::Plain(FastaProcessor::new());
256 }
257 }
258
259 match &mut self.state {
261 ProcessorState::Detecting => unreachable!(),
262 ProcessorState::Plain(processor) => {
263 processor.write_all(chunk)?;
264 }
265 ProcessorState::Gzipped(decoder) => {
266 decoder.write_all(chunk)?;
269 }
270 }
271
272 Ok(())
273 }
274
275 pub fn finish(self) -> anyhow::Result<SequenceCollection> {
279 match self.state {
280 ProcessorState::Detecting => {
281 let metadata = SequenceCollectionMetadata::from_sequences(&[], None);
283 Ok(SequenceCollection {
284 metadata,
285 sequences: Vec::new(),
286 })
287 }
288 ProcessorState::Plain(processor) => processor.finish(),
289 ProcessorState::Gzipped(decoder) => {
290 let processor = decoder
292 .finish()
293 .map_err(|e| anyhow::anyhow!("Gzip decompression error: {}", e))?;
294 processor.finish()
295 }
296 }
297 }
298
299 pub fn sequence_count(&self) -> usize {
301 match &self.state {
302 ProcessorState::Detecting => 0,
303 ProcessorState::Plain(p) => p.sequences.len(),
304 ProcessorState::Gzipped(d) => d.get_ref().sequences.len(),
305 }
306 }
307
308 pub fn in_sequence(&self) -> bool {
310 match &self.state {
311 ProcessorState::Detecting => false,
312 ProcessorState::Plain(p) => p.current_name.is_some(),
313 ProcessorState::Gzipped(d) => d.get_ref().current_name.is_some(),
314 }
315 }
316
317 pub fn current_sequence_name(&self) -> Option<&str> {
319 match &self.state {
320 ProcessorState::Detecting => None,
321 ProcessorState::Plain(p) => p.current_name.as_deref(),
322 ProcessorState::Gzipped(d) => d.get_ref().current_name.as_deref(),
323 }
324 }
325
326 pub fn current_sequence_length(&self) -> usize {
328 match &self.state {
329 ProcessorState::Detecting => 0,
330 ProcessorState::Plain(p) => p.current_length,
331 ProcessorState::Gzipped(d) => d.get_ref().current_length,
332 }
333 }
334}
335
336impl Default for FastaStreamHasher {
337 fn default() -> Self {
338 Self::new()
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345 use crate::digest::alphabet::AlphabetType;
346 use crate::digest::fasta::digest_fasta_bytes;
347
348 #[test]
349 fn test_streaming_basic() {
350 let mut hasher = FastaStreamHasher::new();
351 hasher
352 .update(b">chr1\nACGT\n>chr2\nTGCA\n")
353 .expect("update");
354 let collection = hasher.finish().expect("finish");
355
356 assert_eq!(collection.sequences.len(), 2);
357 assert_eq!(collection.sequences[0].metadata().name, "chr1");
358 assert_eq!(collection.sequences[0].metadata().length, 4);
359 assert_eq!(collection.sequences[1].metadata().name, "chr2");
360 assert_eq!(collection.sequences[1].metadata().length, 4);
361 }
362
363 #[test]
364 fn test_streaming_chunked() {
365 let mut hasher = FastaStreamHasher::new();
366
367 hasher.update(b">chr1\nAC").expect("chunk 1");
369 hasher.update(b"GT\n>chr2\n").expect("chunk 2");
370 hasher.update(b"TGCA\n").expect("chunk 3");
371
372 let collection = hasher.finish().expect("finish");
373
374 assert_eq!(collection.sequences.len(), 2);
375 assert_eq!(collection.sequences[0].metadata().name, "chr1");
376 assert_eq!(collection.sequences[0].metadata().length, 4);
377 assert_eq!(collection.sequences[1].metadata().name, "chr2");
378 assert_eq!(collection.sequences[1].metadata().length, 4);
379 }
380
381 #[test]
382 fn test_streaming_split_header() {
383 let mut hasher = FastaStreamHasher::new();
384
385 hasher.update(b">ch").expect("chunk 1");
387 hasher.update(b"r1 description\nACGT\n").expect("chunk 2");
388
389 let collection = hasher.finish().expect("finish");
390
391 assert_eq!(collection.sequences.len(), 1);
392 assert_eq!(collection.sequences[0].metadata().name, "chr1");
393 assert_eq!(
394 collection.sequences[0].metadata().description,
395 Some("description".to_string())
396 );
397 }
398
399 #[test]
400 fn test_streaming_matches_batch() {
401 let fasta_data = b">chrX\nTTGGGGAA\n>chr1\nGGAA\n>chr2\nGCGC\n";
403
404 let batch_result = digest_fasta_bytes(fasta_data).expect("batch");
406
407 let mut hasher = FastaStreamHasher::new();
409 hasher.update(fasta_data).expect("streaming");
410 let stream_result = hasher.finish().expect("finish");
411
412 assert_eq!(batch_result.metadata.digest, stream_result.metadata.digest);
414 assert_eq!(
415 batch_result.metadata.names_digest,
416 stream_result.metadata.names_digest
417 );
418 assert_eq!(
419 batch_result.metadata.sequences_digest,
420 stream_result.metadata.sequences_digest
421 );
422 assert_eq!(
423 batch_result.metadata.lengths_digest,
424 stream_result.metadata.lengths_digest
425 );
426
427 for (batch_seq, stream_seq) in batch_result
428 .sequences
429 .iter()
430 .zip(stream_result.sequences.iter())
431 {
432 assert_eq!(batch_seq.metadata().name, stream_seq.metadata().name);
433 assert_eq!(batch_seq.metadata().length, stream_seq.metadata().length);
434 assert_eq!(
435 batch_seq.metadata().sha512t24u,
436 stream_seq.metadata().sha512t24u
437 );
438 assert_eq!(batch_seq.metadata().md5, stream_seq.metadata().md5);
439 assert_eq!(
440 batch_seq.metadata().alphabet,
441 stream_seq.metadata().alphabet
442 );
443 }
444 }
445
446 #[test]
447 fn test_streaming_multiline_sequence() {
448 let mut hasher = FastaStreamHasher::new();
449 hasher.update(b">chr1\nACGT\nTGCA\nAAAA\n").expect("update");
450 let collection = hasher.finish().expect("finish");
451
452 assert_eq!(collection.sequences.len(), 1);
453 assert_eq!(collection.sequences[0].metadata().length, 12);
454 }
455
456 #[test]
457 fn test_streaming_empty() {
458 let hasher = FastaStreamHasher::new();
459 let collection = hasher.finish().expect("finish");
460 assert_eq!(collection.sequences.len(), 0);
461 }
462
463 #[test]
464 fn test_streaming_known_digest() {
465 let mut hasher = FastaStreamHasher::new();
466 hasher.update(b">chrX\nTTGGGGAA\n").expect("update");
467 let collection = hasher.finish().expect("finish");
468
469 assert_eq!(
470 collection.sequences[0].metadata().sha512t24u,
471 "iYtREV555dUFKg2_agSJW6suquUyPpMw"
472 );
473 assert_eq!(
474 collection.sequences[0].metadata().md5,
475 "5f63cfaa3ef61f88c9635fb9d18ec945"
476 );
477 assert_eq!(
478 collection.sequences[0].metadata().alphabet,
479 AlphabetType::Dna2bit
480 );
481 }
482
483 #[test]
484 fn test_streaming_gzipped() {
485 use flate2::Compression;
486 use flate2::write::GzEncoder;
487
488 let fasta = b">chr1\nACGT\n";
489 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
490 encoder.write_all(fasta).expect("compress");
491 let compressed = encoder.finish().expect("finish compression");
492
493 let mut hasher = FastaStreamHasher::new();
494 hasher.update(&compressed).expect("update");
495 let collection = hasher.finish().expect("finish");
496
497 assert_eq!(collection.sequences.len(), 1);
498 assert_eq!(collection.sequences[0].metadata().name, "chr1");
499 assert_eq!(collection.sequences[0].metadata().length, 4);
500 }
501
502 #[test]
503 fn test_streaming_gzipped_chunked() {
504 use flate2::Compression;
506 use flate2::write::GzEncoder;
507
508 let fasta = b">chr1\nACGTTGCA\n>chr2\nGGGGAAAA\n";
509 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
510 encoder.write_all(fasta).expect("compress");
511 let compressed = encoder.finish().expect("finish compression");
512
513 let mut hasher = FastaStreamHasher::new();
515 for chunk in compressed.chunks(5) {
516 hasher.update(chunk).expect("update chunk");
517 }
518 let collection = hasher.finish().expect("finish");
519
520 assert_eq!(collection.sequences.len(), 2);
521 assert_eq!(collection.sequences[0].metadata().name, "chr1");
522 assert_eq!(collection.sequences[0].metadata().length, 8);
523 assert_eq!(collection.sequences[1].metadata().name, "chr2");
524 assert_eq!(collection.sequences[1].metadata().length, 8);
525 }
526
527 #[test]
528 fn test_streaming_progress() {
529 let mut hasher = FastaStreamHasher::new();
530
531 assert_eq!(hasher.sequence_count(), 0);
532 assert!(!hasher.in_sequence());
533 assert!(hasher.current_sequence_name().is_none());
534
535 hasher.update(b">chr1\n").expect("header");
536 assert!(hasher.in_sequence());
537 assert_eq!(hasher.current_sequence_name(), Some("chr1"));
538 assert_eq!(hasher.current_sequence_length(), 0);
539
540 hasher.update(b"ACGT\n").expect("sequence");
541 assert_eq!(hasher.current_sequence_length(), 4);
542
543 hasher.update(b">chr2\n").expect("next header");
544 assert_eq!(hasher.sequence_count(), 1); assert_eq!(hasher.current_sequence_name(), Some("chr2"));
546 }
547
548 #[test]
549 fn test_streaming_chunked_matches_batch() {
550 let fasta_data = b">chrX\nTTGGGGAA\n>chr1\nGGAA\n>chr2\nGCGC\n";
552
553 let batch_result = digest_fasta_bytes(fasta_data).expect("batch");
555
556 for chunk_size in [1, 2, 3, 5, 7, 11, 13, 17] {
558 let mut hasher = FastaStreamHasher::new();
559 for chunk in fasta_data.chunks(chunk_size) {
560 hasher.update(chunk).expect("streaming chunk");
561 }
562 let stream_result = hasher.finish().expect("finish");
563
564 assert_eq!(
565 batch_result.metadata.digest, stream_result.metadata.digest,
566 "Mismatch with chunk size {}",
567 chunk_size
568 );
569 }
570 }
571}