Skip to main content

genomicframe_core/formats/fastq/
stats.rs

1//! FASTQ statistics computation
2//!
3//! Streaming statistics collection for sequencing read files.
4//! Memory-efficient: processes millions of reads with O(1) memory for basic stats.
5
6use crate::core::GenomicRecordIterator;
7use crate::error::Result;
8use crate::parallel::Mergeable;
9use crate::stats::{CategoryCounter, RunningStats};
10
11use super::reader::{FastqReader, FastqRecord, QualityEncoding};
12
13/// Comprehensive statistics for FASTQ files
14#[derive(Debug, Clone)]
15pub struct FastqStats {
16    /// Total number of reads
17    pub total_reads: usize,
18
19    /// Total number of bases sequenced
20    pub total_bases: usize,
21
22    /// Read length statistics
23    pub length_stats: RunningStats,
24
25    /// Quality score statistics (mean quality per read, then averaged)
26    pub mean_quality_stats: RunningStats,
27
28    /// GC content statistics (per read, then averaged)
29    pub gc_content_stats: RunningStats,
30
31    /// Distribution of read lengths
32    pub length_distribution: CategoryCounter<usize>,
33
34    /// Distribution of mean quality scores (binned)
35    pub quality_distribution: CategoryCounter<u8>,
36
37    /// Reads with mean quality >= 20 (common threshold)
38    pub high_quality_reads_q20: usize,
39
40    /// Reads with mean quality >= 30 (high quality threshold)
41    pub high_quality_reads_q30: usize,
42
43    /// Detected quality encoding
44    pub encoding: Option<QualityEncoding>,
45
46    /// N-base count (ambiguous bases)
47    pub n_bases: usize,
48
49    /// Base composition
50    pub base_counts: CategoryCounter<char>,
51}
52
53impl Default for FastqStats {
54    fn default() -> Self {
55        Self::new()
56    }
57}
58
59impl FastqStats {
60    /// Create a new statistics collector
61    pub fn new() -> Self {
62        Self {
63            total_reads: 0,
64            total_bases: 0,
65            length_stats: RunningStats::new(),
66            mean_quality_stats: RunningStats::new(),
67            gc_content_stats: RunningStats::new(),
68            length_distribution: CategoryCounter::new(),
69            quality_distribution: CategoryCounter::new(),
70            high_quality_reads_q20: 0,
71            high_quality_reads_q30: 0,
72            encoding: None,
73            n_bases: 0,
74            base_counts: CategoryCounter::new(),
75        }
76    }
77
78    /// Update statistics with a single record (streaming)
79    ///
80    /// Optimized for single-pass processing: computes all statistics in one iteration
81    /// over sequence and quality data, avoiding repeated allocations and iterations.
82    pub fn update(&mut self, record: &FastqRecord) {
83        self.total_reads += 1;
84
85        let length = record.len();
86        self.total_bases += length;
87        self.length_stats.push(length as f64);
88        self.length_distribution.increment(length);
89
90        // Single pass over sequence and quality in parallel
91        // This eliminates 3 separate iterations (quality, GC, base counts)
92        let mut gc_count = 0;
93        let mut qual_sum = 0u32;
94
95        // Use bytes() for performance - avoids UTF-8 decoding overhead
96        for (seq_byte, qual_byte) in record.sequence.bytes().zip(record.quality.bytes()) {
97            // Calculate quality score (no Vec allocation!)
98            qual_sum += (qual_byte.saturating_sub(33)) as u32;
99
100            // Count GC bases
101            if seq_byte == b'G' || seq_byte == b'C' || seq_byte == b'g' || seq_byte == b'c' {
102                gc_count += 1;
103            }
104
105            // Track base composition (convert byte to char for counter)
106            let base = seq_byte as char;
107            self.base_counts.increment(base);
108
109            // Track N bases
110            if base == 'N' || base == 'n' {
111                self.n_bases += 1;
112            }
113        }
114
115        // Compute mean quality for this read
116        let mean_qual = if length > 0 {
117            qual_sum as f64 / length as f64
118        } else {
119            0.0
120        };
121
122        self.mean_quality_stats.push(mean_qual);
123        self.quality_distribution.increment(mean_qual as u8);
124
125        // Quality thresholds
126        if mean_qual >= 20.0 {
127            self.high_quality_reads_q20 += 1;
128        }
129        if mean_qual >= 30.0 {
130            self.high_quality_reads_q30 += 1;
131        }
132
133        // Compute GC content
134        let gc_content = if length > 0 {
135            gc_count as f64 / length as f64
136        } else {
137            0.0
138        };
139        self.gc_content_stats.push(gc_content);
140    }
141
142    /// Compute statistics from a FASTQ reader (consumes the reader)
143    pub fn compute(reader: &mut FastqReader) -> Result<Self> {
144        let mut stats = Self::new();
145
146        // Capture encoding from reader after first record is parsed
147        while let Some(record) = reader.next_record()? {
148            if stats.encoding.is_none() {
149                stats.encoding = reader.header().encoding;
150            }
151            stats.update(&record);
152        }
153
154        Ok(stats)
155    }
156
157    /// Compute statistics in parallel using multiple threads
158    ///
159    /// This divides the FASTQ file into chunks and processes them in parallel
160    /// using Rayon. Each thread computes partial statistics for its chunk,
161    /// then the results are merged.
162    ///
163    /// # Performance
164    /// - For small files (<10 MB): Use `compute()` instead (less overhead)
165    /// - For large files (>100 MB): Can achieve near-linear speedup with cores
166    ///
167    /// # Arguments
168    /// - `path`: Path to FASTQ file (can be gzipped)
169    /// - `config`: Optional parallel configuration (defaults to all CPUs)
170    ///
171    /// # Example
172    /// ```ignore
173    /// use genomicframe_core::formats::fastq::FastqStats;
174    /// use genomicframe_core::parallel::ParallelConfig;
175    ///
176    /// let config = ParallelConfig::new().with_threads(4);
177    /// let stats = FastqStats::par_compute("large.fastq", Some(config))?;
178    /// ```
179    pub fn par_compute<P: AsRef<std::path::Path>>(
180        path: P,
181        config: Option<crate::parallel::ParallelConfig>,
182    ) -> Result<Self> {
183        use crate::parallel::{calculate_chunks, find_record_boundary};
184        use rayon::prelude::*;
185        use std::fs::File;
186        use std::io::{BufReader, Seek};
187
188        let config = config.unwrap_or_default();
189        let path = path.as_ref();
190
191        // Get file size
192        let file_size = std::fs::metadata(path)?.len();
193
194        // For small files, use sequential processing
195        if file_size < config.min_chunk_size as u64 {
196            let mut reader = FastqReader::from_path(path)?;
197            return Self::compute(&mut reader);
198        }
199
200        // Calculate chunks
201        let chunks = calculate_chunks(file_size, &config);
202
203        // Get or create a cached thread pool with the configured number of threads
204        let pool = crate::parallel::get_thread_pool(config.threads())?;
205
206        // Process chunks in parallel using the cached thread pool
207        let partial_stats: Vec<Self> = pool.install(|| {
208            chunks
209                .par_iter()
210                .map(|chunk| -> Result<Self> {
211                // Open file for this thread
212                let file = File::open(path)?;
213                let mut reader = BufReader::new(file);
214
215                // Find the actual start of the first complete record in this chunk
216                let start_pos = find_record_boundary(&mut reader, chunk.start)?;
217
218                let start_pos = match start_pos {
219                    Some(pos) => pos,
220                    None => return Ok(Self::new()), // No records in this chunk
221                };
222
223                // Create FASTQ reader starting at this position
224                reader.seek(std::io::SeekFrom::Start(start_pos))?;
225                let mut fastq_reader = FastqReader::from_buffer(reader);
226
227                // Process records until we hit the chunk end
228                let mut stats = Self::new();
229                let mut current_pos = start_pos;
230
231                while let Some(record) = fastq_reader.next_record()? {
232                    // Capture encoding from the first record
233                    if stats.encoding.is_none() {
234                        stats.encoding = fastq_reader.header().encoding;
235                    }
236
237                    stats.update(&record);
238
239                    // Estimate current position (each FASTQ record is ~4 lines)
240                    // This is approximate but avoids expensive tell() calls
241                    current_pos += record.len() as u64 * 2 + 40; // Rough estimate
242
243                    // Stop when we reach the end of our chunk
244                    if current_pos >= chunk.end {
245                        break;
246                    }
247                }
248
249                Ok(stats)
250            })
251            .collect::<Result<Vec<_>>>()
252        })?; // Close pool.install()
253
254        // Merge all partial statistics
255        Self::merge_all(partial_stats)
256            .ok_or_else(|| crate::error::Error::InvalidInput("No statistics computed".to_string()))
257    }
258
259    /// Mean read length
260    pub fn mean_length(&self) -> Option<f64> {
261        self.length_stats.mean()
262    }
263
264    /// Standard deviation of read length
265    pub fn std_length(&self) -> Option<f64> {
266        self.length_stats.std_dev()
267    }
268
269    /// Min read length
270    pub fn min_length(&self) -> Option<f64> {
271        self.length_stats.min()
272    }
273
274    /// Max read length
275    pub fn max_length(&self) -> Option<f64> {
276        self.length_stats.max()
277    }
278
279    /// Mean of per-read mean quality scores
280    pub fn mean_quality(&self) -> Option<f64> {
281        self.mean_quality_stats.mean()
282    }
283
284    /// Standard deviation of quality scores
285    pub fn std_quality(&self) -> Option<f64> {
286        self.mean_quality_stats.std_dev()
287    }
288
289    /// Mean GC content across all reads
290    pub fn mean_gc_content(&self) -> Option<f64> {
291        self.gc_content_stats.mean()
292    }
293
294    /// Standard deviation of GC content
295    pub fn std_gc_content(&self) -> Option<f64> {
296        self.gc_content_stats.std_dev()
297    }
298
299    /// Percentage of high quality reads (Q >= 20)
300    pub fn percent_q20(&self) -> f64 {
301        if self.total_reads == 0 {
302            0.0
303        } else {
304            (self.high_quality_reads_q20 as f64 / self.total_reads as f64) * 100.0
305        }
306    }
307
308    /// Percentage of high quality reads (Q >= 30)
309    pub fn percent_q30(&self) -> f64 {
310        if self.total_reads == 0 {
311            0.0
312        } else {
313            (self.high_quality_reads_q30 as f64 / self.total_reads as f64) * 100.0
314        }
315    }
316
317    /// Percentage of N bases (ambiguous)
318    pub fn percent_n(&self) -> f64 {
319        if self.total_bases == 0 {
320            0.0
321        } else {
322            (self.n_bases as f64 / self.total_bases as f64) * 100.0
323        }
324    }
325
326    /// Get base composition as percentages
327    pub fn base_composition(&self) -> BaseComposition {
328        let total = self.total_bases as f64;
329        if total == 0.0 {
330            return BaseComposition::default();
331        }
332
333        BaseComposition {
334            a_percent: (self.base_counts.get(&'A') as f64 / total) * 100.0,
335            c_percent: (self.base_counts.get(&'C') as f64 / total) * 100.0,
336            g_percent: (self.base_counts.get(&'G') as f64 / total) * 100.0,
337            t_percent: (self.base_counts.get(&'T') as f64 / total) * 100.0,
338            n_percent: self.percent_n(),
339        }
340    }
341
342    /// Print a human-readable summary of statistics
343    pub fn print_summary(&self) {
344        println!("=== FASTQ Statistics ===\n");
345
346        println!("Basic Counts:");
347        println!("  Total reads:     {:>12}", self.total_reads);
348        println!("  Total bases:     {:>12}", self.total_bases);
349        println!();
350
351        println!("Read Length:");
352        println!("  Mean:            {:>12.2}", self.mean_length().unwrap_or(0.0));
353        println!("  Std Dev:         {:>12.2}", self.std_length().unwrap_or(0.0));
354        println!(
355            "  Min:             {:>12.0}",
356            self.min_length().unwrap_or(0.0)
357        );
358        println!(
359            "  Max:             {:>12.0}",
360            self.max_length().unwrap_or(0.0)
361        );
362        println!();
363
364        println!("Quality Scores:");
365        if let Some(encoding) = self.encoding {
366            println!("  Encoding:        {:>12?}", encoding);
367        }
368        println!(
369            "  Mean quality:    {:>12.2}",
370            self.mean_quality().unwrap_or(0.0)
371        );
372        println!(
373            "  Std Dev:         {:>12.2}",
374            self.std_quality().unwrap_or(0.0)
375        );
376        println!("  Q >= 20:         {:>11.1}%", self.percent_q20());
377        println!("  Q >= 30:         {:>11.1}%", self.percent_q30());
378        println!();
379
380        println!("Base Composition:");
381        let composition = self.base_composition();
382        println!("  A:               {:>11.1}%", composition.a_percent);
383        println!("  C:               {:>11.1}%", composition.c_percent);
384        println!("  G:               {:>11.1}%", composition.g_percent);
385        println!("  T:               {:>11.1}%", composition.t_percent);
386        println!("  N (ambiguous):   {:>11.1}%", composition.n_percent);
387        println!();
388
389        println!("GC Content:");
390        println!(
391            "  Mean:            {:>11.1}%",
392            self.mean_gc_content().unwrap_or(0.0) * 100.0
393        );
394        println!(
395            "  Std Dev:         {:>11.1}%",
396            self.std_gc_content().unwrap_or(0.0) * 100.0
397        );
398        println!();
399
400        // Top length bins (sorted by count descending)
401        println!("Top Read Lengths:");
402        let mut length_vec: Vec<_> = self.length_distribution.iter().collect();
403        length_vec.sort_by(|a, b| b.1.cmp(a.1)); // Sort by count descending
404        for (length, count) in length_vec.iter().take(10) {
405            let percent = (**count as f64 / self.total_reads as f64) * 100.0;
406            println!("  {:>4}bp: {:>10} ({:>5.1}%)", length, count, percent);
407        }
408        println!();
409
410        // Quality distribution (binned, sorted by quality score)
411        println!("Quality Distribution (binned):");
412        let mut qual_vec: Vec<_> = self.quality_distribution.iter().collect();
413        qual_vec.sort_by_key(|a| a.0); // Sort by quality score ascending
414        for (qual_bin, count) in qual_vec.iter().take(10) {
415            let percent = (**count as f64 / self.total_reads as f64) * 100.0;
416            println!("  Q{:>2}: {:>10} ({:>5.1}%)", qual_bin, count, percent);
417        }
418    }
419}
420
421/// Base composition percentages
422#[derive(Debug, Clone, Default)]
423pub struct BaseComposition {
424    pub a_percent: f64,
425    pub c_percent: f64,
426    pub g_percent: f64,
427    pub t_percent: f64,
428    pub n_percent: f64,
429}
430
431/// Implementation of Mergeable for parallel statistics computation
432///
433/// Merges FASTQ statistics from multiple threads/chunks.
434/// All statistics are correctly combined using the underlying
435/// Mergeable implementations for RunningStats and CategoryCounter.
436impl Mergeable for FastqStats {
437    fn merge(&mut self, other: Self) {
438        // Merge simple counters
439        self.total_reads += other.total_reads;
440        self.total_bases += other.total_bases;
441        self.high_quality_reads_q20 += other.high_quality_reads_q20;
442        self.high_quality_reads_q30 += other.high_quality_reads_q30;
443        self.n_bases += other.n_bases;
444
445        // Merge running statistics
446        self.length_stats.merge(other.length_stats);
447        self.mean_quality_stats.merge(other.mean_quality_stats);
448        self.gc_content_stats.merge(other.gc_content_stats);
449
450        // Merge categorical data
451        self.length_distribution.merge(other.length_distribution);
452        self.quality_distribution.merge(other.quality_distribution);
453        self.base_counts.merge(other.base_counts);
454
455        // Merge encoding (prefer first non-None value)
456        if self.encoding.is_none() {
457            self.encoding = other.encoding;
458        }
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    use super::*;
465    use std::io::Write;
466    use tempfile::NamedTempFile;
467
468    fn create_test_fastq() -> NamedTempFile {
469        let mut temp_file = NamedTempFile::new().unwrap();
470
471        // Write 3 reads with different qualities
472        writeln!(temp_file, "@READ1").unwrap();
473        writeln!(temp_file, "ACGTACGT").unwrap(); // 8bp, 50% GC
474        writeln!(temp_file, "+").unwrap();
475        writeln!(temp_file, "IIIIIIII").unwrap(); // High quality (Q40)
476
477        writeln!(temp_file, "@READ2").unwrap();
478        writeln!(temp_file, "AAAATTTT").unwrap(); // 8bp, 0% GC
479        writeln!(temp_file, "+").unwrap();
480        writeln!(temp_file, "########").unwrap(); // Medium quality (Q7)
481
482        writeln!(temp_file, "@READ3").unwrap();
483        writeln!(temp_file, "GGGGCCCCNNNN").unwrap(); // 12bp, 66% GC (excluding N)
484        writeln!(temp_file, "+").unwrap();
485        writeln!(temp_file, "IIIIIIIIIIII").unwrap(); // High quality
486
487        temp_file.flush().unwrap();
488        temp_file
489    }
490
491    #[test]
492    fn test_fastq_stats_basic() -> Result<()> {
493        let temp_file = create_test_fastq();
494        let mut reader = FastqReader::from_path(temp_file.path())?;
495        let stats = FastqStats::compute(&mut reader)?;
496
497        assert_eq!(stats.total_reads, 3);
498        assert_eq!(stats.total_bases, 28); // 8 + 8 + 12
499
500        Ok(())
501    }
502
503    #[test]
504    fn test_fastq_stats_length() -> Result<()> {
505        let temp_file = create_test_fastq();
506        let mut reader = FastqReader::from_path(temp_file.path())?;
507        let stats = FastqStats::compute(&mut reader)?;
508
509        // Mean length: (8 + 8 + 12) / 3 = 9.33
510        let mean = stats.mean_length().unwrap();
511        assert!((mean - 9.33).abs() < 0.1);
512
513        assert_eq!(stats.min_length().unwrap(), 8.0);
514        assert_eq!(stats.max_length().unwrap(), 12.0);
515
516        Ok(())
517    }
518
519    #[test]
520    fn test_fastq_stats_quality() -> Result<()> {
521        let temp_file = create_test_fastq();
522        let mut reader = FastqReader::from_path(temp_file.path())?;
523        let stats = FastqStats::compute(&mut reader)?;
524
525        // At least 2 out of 3 reads should be Q30+ (READ1 and READ3)
526        assert!(stats.high_quality_reads_q30 >= 2);
527        assert!(stats.percent_q30() >= 60.0);
528
529        Ok(())
530    }
531
532    #[test]
533    fn test_fastq_stats_gc_content() -> Result<()> {
534        let temp_file = create_test_fastq();
535        let mut reader = FastqReader::from_path(temp_file.path())?;
536        let stats = FastqStats::compute(&mut reader)?;
537
538        // READ1: 4/8 = 50%, READ2: 0/8 = 0%, READ3: 8/12 = 66.7%
539        // Mean: (0.5 + 0.0 + 0.667) / 3 = 0.389 = 38.9%
540        let mean_gc = stats.mean_gc_content().unwrap();
541        assert!((mean_gc - 0.389).abs() < 0.01);
542
543        Ok(())
544    }
545
546    #[test]
547    fn test_fastq_stats_base_composition() -> Result<()> {
548        let temp_file = create_test_fastq();
549        let mut reader = FastqReader::from_path(temp_file.path())?;
550        let stats = FastqStats::compute(&mut reader)?;
551
552        let composition = stats.base_composition();
553
554        // READ1: 2A, 2C, 2G, 2T = 8 bases
555        // READ2: 4A, 0C, 0G, 4T = 8 bases
556        // READ3: 0A, 4C, 4G, 0T, 4N = 12 bases
557        // Total: 6A, 6C, 6G, 6T, 4N = 28 bases
558        assert!((composition.a_percent - 21.4).abs() < 1.0); // 6/28 ≈ 21.4%
559        assert!((composition.c_percent - 21.4).abs() < 1.0); // 6/28 ≈ 21.4%
560        assert!((composition.g_percent - 21.4).abs() < 1.0); // 6/28 ≈ 21.4%
561        assert!((composition.t_percent - 21.4).abs() < 1.0); // 6/28 ≈ 21.4%
562        assert!((composition.n_percent - 14.3).abs() < 1.0); // 4/28 ≈ 14.3%
563
564        Ok(())
565    }
566
567    #[test]
568    fn test_fastq_stats_n_bases() -> Result<()> {
569        let temp_file = create_test_fastq();
570        let mut reader = FastqReader::from_path(temp_file.path())?;
571        let stats = FastqStats::compute(&mut reader)?;
572
573        assert_eq!(stats.n_bases, 4); // 4 N's in READ3
574        assert!((stats.percent_n() - 14.3).abs() < 1.0); // 4/28 ≈ 14.3%
575
576        Ok(())
577    }
578
579    #[test]
580    fn test_fastq_stats_streaming() -> Result<()> {
581        let temp_file = create_test_fastq();
582        let mut reader = FastqReader::from_path(temp_file.path())?;
583
584        // Manually stream and update
585        let mut stats = FastqStats::new();
586        let mut count = 0;
587        while let Some(record) = reader.next_record()? {
588            stats.update(&record);
589            count += 1;
590        }
591
592        assert_eq!(count, 3);
593        assert_eq!(stats.total_reads, 3);
594
595        Ok(())
596    }
597
598    #[test]
599    fn test_fastq_stats_merge() -> Result<()> {
600        let temp_file = create_test_fastq();
601        let mut reader = FastqReader::from_path(temp_file.path())?;
602
603        // Create two partial stats
604        let mut stats1 = FastqStats::new();
605        let mut stats2 = FastqStats::new();
606
607        // Add first record to stats1
608        if let Some(record) = reader.next_record()? {
609            stats1.update(&record);
610        }
611
612        // Add remaining records to stats2
613        while let Some(record) = reader.next_record()? {
614            stats2.update(&record);
615        }
616
617        // Verify partial stats
618        assert_eq!(stats1.total_reads, 1);
619        assert_eq!(stats2.total_reads, 2);
620
621        // Merge
622        stats1.merge(stats2);
623
624        // Verify merged stats
625        assert_eq!(stats1.total_reads, 3);
626        assert_eq!(stats1.total_bases, 28); // 8 + 8 + 12
627
628        Ok(())
629    }
630}