use super::{ErrorModel, ReadEnd};
const DECAY_START_FRACTION: f64 = 0.7;
const DEFAULT_R2_MULTIPLIER: f64 = 1.5;
#[derive(Debug, Clone, Copy)]
struct CycleParams {
p_err: f64,
base_qual: u8,
}
pub struct IlluminaErrorModel {
read_length: usize,
min_error_rate: f64,
max_error_rate: f64,
r2_multiplier: f64,
decay_start: usize,
r1_params: Vec<CycleParams>,
r2_params: Vec<CycleParams>,
}
impl IlluminaErrorModel {
#[must_use]
pub fn new(read_length: usize, min_error_rate: f64, max_error_rate: f64) -> Self {
Self::with_r2_multiplier(read_length, min_error_rate, max_error_rate, DEFAULT_R2_MULTIPLIER)
}
#[must_use]
pub fn with_r2_multiplier(
read_length: usize,
min_error_rate: f64,
max_error_rate: f64,
r2_multiplier: f64,
) -> Self {
assert!(read_length > 0, "read_length must be > 0");
assert!(
min_error_rate <= max_error_rate,
"min_error_rate ({min_error_rate}) must be <= max_error_rate ({max_error_rate})"
);
#[expect(clippy::cast_possible_truncation, reason = "product is bounded by read_length")]
#[expect(clippy::cast_sign_loss, reason = "product is non-negative")]
let decay_start = (read_length as f64 * DECAY_START_FRACTION).round() as usize;
let mut model = Self {
read_length,
min_error_rate,
max_error_rate,
r2_multiplier,
decay_start,
r1_params: Vec::new(),
r2_params: Vec::new(),
};
model.r1_params =
(0..read_length).map(|c| model.compute_cycle_params(c, ReadEnd::Read1)).collect();
model.r2_params =
(0..read_length).map(|c| model.compute_cycle_params(c, ReadEnd::Read2)).collect();
model
}
}
impl IlluminaErrorModel {
fn compute_error_probability(&self, cycle: usize, read_end: ReadEnd) -> f64 {
let base_rate = if cycle < self.decay_start {
self.min_error_rate
} else {
let ramp_len = self.read_length.saturating_sub(self.decay_start).max(1);
let progress = (cycle - self.decay_start) as f64 / ramp_len as f64;
self.min_error_rate + (self.max_error_rate - self.min_error_rate) * progress.min(1.0)
};
match read_end {
ReadEnd::Read1 => base_rate,
ReadEnd::Read2 => (base_rate * self.r2_multiplier).min(1.0),
}
}
fn compute_cycle_params(&self, cycle: usize, read_end: ReadEnd) -> CycleParams {
let p_err = self.compute_error_probability(cycle, read_end);
#[expect(clippy::cast_possible_truncation, reason = "clamped to [2, 41]")]
#[expect(clippy::cast_sign_loss, reason = "clamped to [2, 41]")]
let base_qual = if p_err > 0.0 {
(-10.0 * p_err.log10()).round().clamp(2.0, 41.0) as u8 + 33
} else {
41 + 33
};
CycleParams { p_err, base_qual }
}
#[inline]
fn cycle_params(&self, cycle: usize, read_end: ReadEnd) -> CycleParams {
match read_end {
ReadEnd::Read1 => self.r1_params[cycle],
ReadEnd::Read2 => self.r2_params[cycle],
}
}
}
impl ErrorModel for IlluminaErrorModel {
fn error_probability(&self, cycle: usize, read_end: ReadEnd) -> f64 {
self.cycle_params(cycle, read_end).p_err
}
fn base_quality_phred33(&self, cycle: usize, read_end: ReadEnd) -> u8 {
self.cycle_params(cycle, read_end).base_qual
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_error_rate_at_start() {
let model = IlluminaErrorModel::new(150, 0.001, 0.01);
let p = model.error_probability(0, ReadEnd::Read1);
assert!((p - 0.001).abs() < 1e-10);
}
#[test]
fn test_error_rate_flat_before_decay() {
let model = IlluminaErrorModel::new(150, 0.001, 0.01);
for cycle in 0..100 {
let p = model.error_probability(cycle, ReadEnd::Read1);
assert!((p - 0.001).abs() < 1e-10, "cycle {cycle}: expected 0.001, got {p}");
}
}
#[test]
fn test_error_rate_ramps_to_max() {
let model = IlluminaErrorModel::new(150, 0.001, 0.01);
let p = model.error_probability(149, ReadEnd::Read1);
assert!((p - 0.01).abs() < 0.001, "expected ~0.01 at last cycle, got {p}");
}
#[test]
fn test_error_rate_increases_monotonically() {
let model = IlluminaErrorModel::new(150, 0.001, 0.01);
let mut prev = 0.0;
for cycle in 0..150 {
let p = model.error_probability(cycle, ReadEnd::Read1);
assert!(p >= prev, "cycle {cycle}: {p} < {prev}");
prev = p;
}
}
#[test]
fn test_r2_higher_than_r1() {
let model = IlluminaErrorModel::new(150, 0.001, 0.01);
for cycle in [0, 50, 100, 149] {
let p1 = model.error_probability(cycle, ReadEnd::Read1);
let p2 = model.error_probability(cycle, ReadEnd::Read2);
assert!(p2 > p1, "cycle {cycle}: R2 ({p2}) should be > R1 ({p1})");
}
}
#[test]
fn test_r2_multiplier() {
let model = IlluminaErrorModel::new(150, 0.001, 0.01);
let p1 = model.error_probability(0, ReadEnd::Read1);
let p2 = model.error_probability(0, ReadEnd::Read2);
assert!((p2 - p1 * 1.5).abs() < 1e-10, "R2 should be 1.5x R1 at start");
}
#[test]
fn test_apply_errors_reproducible() {
use rand::SeedableRng;
use rand::rngs::SmallRng;
let model = IlluminaErrorModel::new(10, 0.1, 0.3);
let mut bases1 = b"ACGTACGTAC".to_vec();
let mut bases2 = b"ACGTACGTAC".to_vec();
let mut rng1 = SmallRng::seed_from_u64(42);
let mut rng2 = SmallRng::seed_from_u64(42);
let (n1, q1) =
crate::error_model::apply_errors(&model, &mut bases1, ReadEnd::Read1, &mut rng1);
let (n2, q2) =
crate::error_model::apply_errors(&model, &mut bases2, ReadEnd::Read1, &mut rng2);
assert_eq!(bases1, bases2);
assert_eq!(q1, q2);
assert_eq!(n1, n2);
}
#[test]
fn test_apply_errors_introduces_errors() {
use rand::SeedableRng;
use rand::rngs::SmallRng;
let model = IlluminaErrorModel::new(100, 0.5, 0.5); let original = vec![b'A'; 100];
let mut bases = original.clone();
let mut rng = SmallRng::seed_from_u64(123);
let (n_errors, qualities) =
crate::error_model::apply_errors(&model, &mut bases, ReadEnd::Read1, &mut rng);
assert!(n_errors > 10, "expected many errors with 50% rate, got {n_errors}");
assert_eq!(qualities.len(), 100);
assert_ne!(bases, original);
}
#[test]
fn test_apply_errors_quality_encoding() {
use rand::SeedableRng;
use rand::rngs::SmallRng;
let model = IlluminaErrorModel::new(10, 0.001, 0.01);
let mut bases = b"ACGTACGTAC".to_vec();
let mut rng = SmallRng::seed_from_u64(99);
let (_, qualities) =
crate::error_model::apply_errors(&model, &mut bases, ReadEnd::Read1, &mut rng);
for &q in &qualities {
assert!(q >= 35, "quality {q} below minimum Phred+33 (35)");
assert!(q <= 74, "quality {q} above maximum Phred+33 (74)");
}
}
#[test]
fn test_apply_errors_zero_rate() {
use rand::SeedableRng;
use rand::rngs::SmallRng;
let model = IlluminaErrorModel::new(10, 0.0, 0.0);
let original = b"ACGTACGTAC".to_vec();
let mut bases = original.clone();
let mut rng = SmallRng::seed_from_u64(42);
let (n_errors, qualities) =
crate::error_model::apply_errors(&model, &mut bases, ReadEnd::Read1, &mut rng);
assert_eq!(n_errors, 0, "zero error rate should produce no errors");
assert_eq!(bases, original, "bases should be unchanged");
assert_eq!(qualities.len(), 10);
}
#[test]
#[should_panic(expected = "read_length must be > 0")]
fn test_zero_read_length_panics() {
let _ = IlluminaErrorModel::new(0, 0.001, 0.01);
}
#[test]
#[should_panic(expected = "min_error_rate")]
fn test_min_gt_max_panics() {
let _ = IlluminaErrorModel::new(150, 0.1, 0.01);
}
}