Skip to main content

gtars_refget/digest/
stream.rs

1//! Streaming FASTA hasher - WASM-safe, constant memory usage.
2//!
3//! This module provides a streaming FASTA parser that can process data in chunks,
4//! maintaining constant memory usage regardless of file size. Ideal for WASM
5//! environments where large files are fetched in chunks.
6
7use md5::Md5;
8use sha2::{Digest, Sha512};
9use std::io::{self, Write};
10
11use super::alphabet::AlphabetGuesser;
12use super::fasta::parse_fasta_header;
13use super::types::{
14    SequenceCollection, SequenceCollectionMetadata, SequenceMetadata, SequenceRecord,
15};
16
17/// Streaming state for FASTA processing.
18#[derive(Clone, Copy, Debug, PartialEq)]
19enum ParserState {
20    /// Waiting for a header line starting with '>'
21    AwaitingHeader,
22    /// Currently in sequence data lines
23    InSequence,
24}
25
26/// Inner FASTA processor that implements Write.
27/// Decompressed data flows into this via the Write trait.
28struct FastaProcessor {
29    state: ParserState,
30    line_buffer: Vec<u8>,
31    current_name: Option<String>,
32    current_description: Option<String>,
33    current_length: usize,
34    sha512_hasher: Sha512,
35    md5_hasher: Md5,
36    alphabet_guesser: AlphabetGuesser,
37    sequences: Vec<SequenceRecord>,
38    /// Store any processing error (Write trait can't return custom errors)
39    processing_error: Option<String>,
40}
41
42impl FastaProcessor {
43    fn new() -> Self {
44        Self {
45            state: ParserState::AwaitingHeader,
46            line_buffer: Vec::with_capacity(8192),
47            current_name: None,
48            current_description: None,
49            current_length: 0,
50            sha512_hasher: Sha512::new(),
51            md5_hasher: Md5::new(),
52            alphabet_guesser: AlphabetGuesser::new(),
53            sequences: Vec::new(),
54            processing_error: None,
55        }
56    }
57
58    fn process_byte(&mut self, byte: u8) {
59        // Stop processing if we already have an error
60        if self.processing_error.is_some() {
61            return;
62        }
63
64        if byte == b'\n' || byte == b'\r' {
65            if let Err(e) = self.process_line() {
66                self.processing_error = Some(e.to_string());
67            }
68            self.line_buffer.clear();
69        } else {
70            self.line_buffer.push(byte);
71        }
72    }
73
74    fn process_line(&mut self) -> anyhow::Result<()> {
75        if self.line_buffer.is_empty() {
76            return Ok(());
77        }
78
79        if self.line_buffer[0] == b'>' {
80            // Header line - finalize previous sequence if any
81            if self.current_name.is_some() {
82                self.finalize_current_sequence();
83            }
84
85            // Parse header
86            let header = std::str::from_utf8(&self.line_buffer[1..])
87                .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in header: {}", e))?;
88            let (name, description) = parse_fasta_header(header);
89
90            // Start new sequence
91            self.current_name = Some(name);
92            self.current_description = description;
93            self.current_length = 0;
94            self.sha512_hasher = Sha512::new();
95            self.md5_hasher = Md5::new();
96            self.alphabet_guesser = AlphabetGuesser::new();
97            self.state = ParserState::InSequence;
98        } else if self.state == ParserState::InSequence && self.current_name.is_some() {
99            // Sequence line - uppercase and hash
100            let uppercased: Vec<u8> = self
101                .line_buffer
102                .iter()
103                .filter(|&&b| !b.is_ascii_whitespace())
104                .map(|b| b.to_ascii_uppercase())
105                .collect();
106
107            if !uppercased.is_empty() {
108                self.sha512_hasher.update(&uppercased);
109                self.md5_hasher.update(&uppercased);
110                self.alphabet_guesser.update(&uppercased);
111                self.current_length += uppercased.len();
112            }
113        }
114
115        Ok(())
116    }
117
118    fn finalize_current_sequence(&mut self) {
119        if let Some(name) = self.current_name.take() {
120            let sha512 = base64_url::encode(&self.sha512_hasher.clone().finalize()[0..24]);
121            let md5 = format!("{:x}", self.md5_hasher.clone().finalize());
122            let alphabet = self.alphabet_guesser.guess();
123
124            let metadata = SequenceMetadata {
125                name,
126                description: self.current_description.take(),
127                length: self.current_length,
128                sha512t24u: sha512,
129                md5,
130                alphabet,
131                fai: None,
132            };
133
134            self.sequences.push(SequenceRecord::Stub(metadata));
135        }
136    }
137
138    fn finish(mut self) -> anyhow::Result<SequenceCollection> {
139        // Check for any stored processing error
140        if let Some(err) = self.processing_error {
141            return Err(anyhow::anyhow!("Processing error: {}", err));
142        }
143
144        // Process any remaining data in the line buffer
145        if !self.line_buffer.is_empty() {
146            self.process_line()?;
147        }
148
149        // Finalize the last sequence
150        if self.current_name.is_some() {
151            self.finalize_current_sequence();
152        }
153
154        // Build the collection
155        let metadata = SequenceCollectionMetadata::from_sequences(&self.sequences, None);
156
157        Ok(SequenceCollection {
158            metadata,
159            sequences: self.sequences,
160        })
161    }
162}
163
164impl Write for FastaProcessor {
165    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
166        for &byte in buf {
167            self.process_byte(byte);
168            // Check for processing errors after each byte
169            if let Some(ref err) = self.processing_error {
170                return Err(io::Error::new(io::ErrorKind::InvalidData, err.clone()));
171            }
172        }
173        Ok(buf.len())
174    }
175
176    fn flush(&mut self) -> io::Result<()> {
177        Ok(())
178    }
179}
180
181/// State for format detection and processing
182enum ProcessorState {
183    /// Haven't detected format yet
184    Detecting,
185    /// Plain FASTA (uncompressed)
186    Plain(FastaProcessor),
187    /// Gzipped FASTA - uses write::GzDecoder for true streaming decompression
188    Gzipped(flate2::write::GzDecoder<FastaProcessor>),
189}
190
191/// A streaming FASTA hasher that processes data chunk-by-chunk.
192///
193/// This is designed for WASM environments where files are fetched in chunks.
194/// Memory usage is constant (~100KB) regardless of file size:
195/// - Internal state: ~200 bytes (hasher state, counters)
196/// - Line buffer: ~8KB (handles long lines)
197/// - Gzip decoder state: ~32KB if compressed
198/// - Results: grows only with number of sequences (not sequence length)
199///
200/// # Example
201/// ```
202/// use gtars_refget::digest::stream::FastaStreamHasher;
203///
204/// let mut hasher = FastaStreamHasher::new();
205///
206/// // Process first chunk
207/// hasher.update(b">chr1\nACGT").expect("update");
208///
209/// // Process second chunk
210/// hasher.update(b"TGCA\n>chr2\nGGGG\n").expect("update");
211///
212/// // Finalize and get results
213/// let collection = hasher.finish().expect("finish");
214/// assert_eq!(collection.sequences.len(), 2);
215/// ```
216pub struct FastaStreamHasher {
217    state: ProcessorState,
218}
219
220impl FastaStreamHasher {
221    /// Create a new streaming FASTA hasher.
222    pub fn new() -> Self {
223        Self {
224            state: ProcessorState::Detecting,
225        }
226    }
227
228    /// Process a chunk of FASTA data.
229    ///
230    /// This method can be called multiple times with successive chunks of data.
231    /// Handles both plain text and gzip-compressed FASTA with true streaming
232    /// decompression (constant memory usage).
233    ///
234    /// # Arguments
235    /// * `chunk` - A slice of bytes from the FASTA file
236    ///
237    /// # Returns
238    /// Ok(()) on success, Err on parsing error
239    pub fn update(&mut self, chunk: &[u8]) -> anyhow::Result<()> {
240        if chunk.is_empty() {
241            return Ok(());
242        }
243
244        // Detect format on first non-empty chunk
245        if matches!(self.state, ProcessorState::Detecting) {
246            let is_gzipped = chunk.len() >= 2 && chunk[0] == 0x1f && chunk[1] == 0x8b;
247
248            if is_gzipped {
249                // Create GzDecoder wrapping a FastaProcessor
250                // Decompressed data flows directly into the processor
251                let processor = FastaProcessor::new();
252                let decoder = flate2::write::GzDecoder::new(processor);
253                self.state = ProcessorState::Gzipped(decoder);
254            } else {
255                self.state = ProcessorState::Plain(FastaProcessor::new());
256            }
257        }
258
259        // Process the chunk
260        match &mut self.state {
261            ProcessorState::Detecting => unreachable!(),
262            ProcessorState::Plain(processor) => {
263                processor.write_all(chunk)?;
264            }
265            ProcessorState::Gzipped(decoder) => {
266                // Write compressed data to decoder
267                // Decoder decompresses and writes to inner FastaProcessor
268                decoder.write_all(chunk)?;
269            }
270        }
271
272        Ok(())
273    }
274
275    /// Finalize processing and return the SequenceCollection.
276    ///
277    /// This must be called after all chunks have been processed via `update()`.
278    pub fn finish(self) -> anyhow::Result<SequenceCollection> {
279        match self.state {
280            ProcessorState::Detecting => {
281                // No data was ever provided - return empty collection
282                let metadata = SequenceCollectionMetadata::from_sequences(&[], None);
283                Ok(SequenceCollection {
284                    metadata,
285                    sequences: Vec::new(),
286                })
287            }
288            ProcessorState::Plain(processor) => processor.finish(),
289            ProcessorState::Gzipped(decoder) => {
290                // Finish decompression and get the inner processor
291                let processor = decoder
292                    .finish()
293                    .map_err(|e| anyhow::anyhow!("Gzip decompression error: {}", e))?;
294                processor.finish()
295            }
296        }
297    }
298
299    /// Get the current number of completed sequences.
300    pub fn sequence_count(&self) -> usize {
301        match &self.state {
302            ProcessorState::Detecting => 0,
303            ProcessorState::Plain(p) => p.sequences.len(),
304            ProcessorState::Gzipped(d) => d.get_ref().sequences.len(),
305        }
306    }
307
308    /// Check if currently processing a sequence.
309    pub fn in_sequence(&self) -> bool {
310        match &self.state {
311            ProcessorState::Detecting => false,
312            ProcessorState::Plain(p) => p.current_name.is_some(),
313            ProcessorState::Gzipped(d) => d.get_ref().current_name.is_some(),
314        }
315    }
316
317    /// Get the name of the sequence currently being processed (if any).
318    pub fn current_sequence_name(&self) -> Option<&str> {
319        match &self.state {
320            ProcessorState::Detecting => None,
321            ProcessorState::Plain(p) => p.current_name.as_deref(),
322            ProcessorState::Gzipped(d) => d.get_ref().current_name.as_deref(),
323        }
324    }
325
326    /// Get the current length of the sequence being processed.
327    pub fn current_sequence_length(&self) -> usize {
328        match &self.state {
329            ProcessorState::Detecting => 0,
330            ProcessorState::Plain(p) => p.current_length,
331            ProcessorState::Gzipped(d) => d.get_ref().current_length,
332        }
333    }
334}
335
336impl Default for FastaStreamHasher {
337    fn default() -> Self {
338        Self::new()
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345    use crate::digest::alphabet::AlphabetType;
346    use crate::digest::fasta::digest_fasta_bytes;
347
348    #[test]
349    fn test_streaming_basic() {
350        let mut hasher = FastaStreamHasher::new();
351        hasher
352            .update(b">chr1\nACGT\n>chr2\nTGCA\n")
353            .expect("update");
354        let collection = hasher.finish().expect("finish");
355
356        assert_eq!(collection.sequences.len(), 2);
357        assert_eq!(collection.sequences[0].metadata().name, "chr1");
358        assert_eq!(collection.sequences[0].metadata().length, 4);
359        assert_eq!(collection.sequences[1].metadata().name, "chr2");
360        assert_eq!(collection.sequences[1].metadata().length, 4);
361    }
362
363    #[test]
364    fn test_streaming_chunked() {
365        let mut hasher = FastaStreamHasher::new();
366
367        // Split data across multiple chunks
368        hasher.update(b">chr1\nAC").expect("chunk 1");
369        hasher.update(b"GT\n>chr2\n").expect("chunk 2");
370        hasher.update(b"TGCA\n").expect("chunk 3");
371
372        let collection = hasher.finish().expect("finish");
373
374        assert_eq!(collection.sequences.len(), 2);
375        assert_eq!(collection.sequences[0].metadata().name, "chr1");
376        assert_eq!(collection.sequences[0].metadata().length, 4);
377        assert_eq!(collection.sequences[1].metadata().name, "chr2");
378        assert_eq!(collection.sequences[1].metadata().length, 4);
379    }
380
381    #[test]
382    fn test_streaming_split_header() {
383        let mut hasher = FastaStreamHasher::new();
384
385        // Header split across chunks
386        hasher.update(b">ch").expect("chunk 1");
387        hasher.update(b"r1 description\nACGT\n").expect("chunk 2");
388
389        let collection = hasher.finish().expect("finish");
390
391        assert_eq!(collection.sequences.len(), 1);
392        assert_eq!(collection.sequences[0].metadata().name, "chr1");
393        assert_eq!(
394            collection.sequences[0].metadata().description,
395            Some("description".to_string())
396        );
397    }
398
399    #[test]
400    fn test_streaming_matches_batch() {
401        // Test that streaming produces same results as batch processing
402        let fasta_data = b">chrX\nTTGGGGAA\n>chr1\nGGAA\n>chr2\nGCGC\n";
403
404        // Batch processing
405        let batch_result = digest_fasta_bytes(fasta_data).expect("batch");
406
407        // Streaming processing (single chunk)
408        let mut hasher = FastaStreamHasher::new();
409        hasher.update(fasta_data).expect("streaming");
410        let stream_result = hasher.finish().expect("finish");
411
412        // Compare results
413        assert_eq!(batch_result.metadata.digest, stream_result.metadata.digest);
414        assert_eq!(
415            batch_result.metadata.names_digest,
416            stream_result.metadata.names_digest
417        );
418        assert_eq!(
419            batch_result.metadata.sequences_digest,
420            stream_result.metadata.sequences_digest
421        );
422        assert_eq!(
423            batch_result.metadata.lengths_digest,
424            stream_result.metadata.lengths_digest
425        );
426
427        for (batch_seq, stream_seq) in batch_result
428            .sequences
429            .iter()
430            .zip(stream_result.sequences.iter())
431        {
432            assert_eq!(batch_seq.metadata().name, stream_seq.metadata().name);
433            assert_eq!(batch_seq.metadata().length, stream_seq.metadata().length);
434            assert_eq!(
435                batch_seq.metadata().sha512t24u,
436                stream_seq.metadata().sha512t24u
437            );
438            assert_eq!(batch_seq.metadata().md5, stream_seq.metadata().md5);
439            assert_eq!(
440                batch_seq.metadata().alphabet,
441                stream_seq.metadata().alphabet
442            );
443        }
444    }
445
446    #[test]
447    fn test_streaming_multiline_sequence() {
448        let mut hasher = FastaStreamHasher::new();
449        hasher.update(b">chr1\nACGT\nTGCA\nAAAA\n").expect("update");
450        let collection = hasher.finish().expect("finish");
451
452        assert_eq!(collection.sequences.len(), 1);
453        assert_eq!(collection.sequences[0].metadata().length, 12);
454    }
455
456    #[test]
457    fn test_streaming_empty() {
458        let hasher = FastaStreamHasher::new();
459        let collection = hasher.finish().expect("finish");
460        assert_eq!(collection.sequences.len(), 0);
461    }
462
463    #[test]
464    fn test_streaming_known_digest() {
465        let mut hasher = FastaStreamHasher::new();
466        hasher.update(b">chrX\nTTGGGGAA\n").expect("update");
467        let collection = hasher.finish().expect("finish");
468
469        assert_eq!(
470            collection.sequences[0].metadata().sha512t24u,
471            "iYtREV555dUFKg2_agSJW6suquUyPpMw"
472        );
473        assert_eq!(
474            collection.sequences[0].metadata().md5,
475            "5f63cfaa3ef61f88c9635fb9d18ec945"
476        );
477        assert_eq!(
478            collection.sequences[0].metadata().alphabet,
479            AlphabetType::Dna2bit
480        );
481    }
482
483    #[test]
484    fn test_streaming_gzipped() {
485        use flate2::Compression;
486        use flate2::write::GzEncoder;
487
488        let fasta = b">chr1\nACGT\n";
489        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
490        encoder.write_all(fasta).expect("compress");
491        let compressed = encoder.finish().expect("finish compression");
492
493        let mut hasher = FastaStreamHasher::new();
494        hasher.update(&compressed).expect("update");
495        let collection = hasher.finish().expect("finish");
496
497        assert_eq!(collection.sequences.len(), 1);
498        assert_eq!(collection.sequences[0].metadata().name, "chr1");
499        assert_eq!(collection.sequences[0].metadata().length, 4);
500    }
501
502    #[test]
503    fn test_streaming_gzipped_chunked() {
504        // Test that gzipped data can be split across chunks
505        use flate2::Compression;
506        use flate2::write::GzEncoder;
507
508        let fasta = b">chr1\nACGTTGCA\n>chr2\nGGGGAAAA\n";
509        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
510        encoder.write_all(fasta).expect("compress");
511        let compressed = encoder.finish().expect("finish compression");
512
513        // Split compressed data into small chunks
514        let mut hasher = FastaStreamHasher::new();
515        for chunk in compressed.chunks(5) {
516            hasher.update(chunk).expect("update chunk");
517        }
518        let collection = hasher.finish().expect("finish");
519
520        assert_eq!(collection.sequences.len(), 2);
521        assert_eq!(collection.sequences[0].metadata().name, "chr1");
522        assert_eq!(collection.sequences[0].metadata().length, 8);
523        assert_eq!(collection.sequences[1].metadata().name, "chr2");
524        assert_eq!(collection.sequences[1].metadata().length, 8);
525    }
526
527    #[test]
528    fn test_streaming_progress() {
529        let mut hasher = FastaStreamHasher::new();
530
531        assert_eq!(hasher.sequence_count(), 0);
532        assert!(!hasher.in_sequence());
533        assert!(hasher.current_sequence_name().is_none());
534
535        hasher.update(b">chr1\n").expect("header");
536        assert!(hasher.in_sequence());
537        assert_eq!(hasher.current_sequence_name(), Some("chr1"));
538        assert_eq!(hasher.current_sequence_length(), 0);
539
540        hasher.update(b"ACGT\n").expect("sequence");
541        assert_eq!(hasher.current_sequence_length(), 4);
542
543        hasher.update(b">chr2\n").expect("next header");
544        assert_eq!(hasher.sequence_count(), 1); // chr1 is finalized
545        assert_eq!(hasher.current_sequence_name(), Some("chr2"));
546    }
547
548    #[test]
549    fn test_streaming_chunked_matches_batch() {
550        // Test that chunked streaming produces same results as batch
551        let fasta_data = b">chrX\nTTGGGGAA\n>chr1\nGGAA\n>chr2\nGCGC\n";
552
553        // Batch processing
554        let batch_result = digest_fasta_bytes(fasta_data).expect("batch");
555
556        // Streaming with various chunk sizes
557        for chunk_size in [1, 2, 3, 5, 7, 11, 13, 17] {
558            let mut hasher = FastaStreamHasher::new();
559            for chunk in fasta_data.chunks(chunk_size) {
560                hasher.update(chunk).expect("streaming chunk");
561            }
562            let stream_result = hasher.finish().expect("finish");
563
564            assert_eq!(
565                batch_result.metadata.digest, stream_result.metadata.digest,
566                "Mismatch with chunk size {}",
567                chunk_size
568            );
569        }
570    }
571}