1#![allow(
8 clippy::many_single_char_names,
9 clippy::needless_range_loop,
10 clippy::cast_possible_truncation,
11 clippy::cast_sign_loss,
12 clippy::cast_possible_wrap,
13 clippy::cast_precision_loss
14)]
15
16mod correct;
17mod count;
18mod kmer;
19
20use std::path::Path;
21
22use rayon::prelude::*;
23use rsomics_common::{Result, RsomicsError};
24use rsomics_fqgz::ChunkedWriter;
25use rsomics_seqio::{OwnedRecord, open_fastq};
26use serde::Serialize;
27
28use correct::correct_one;
29use count::{build_table, has_unique_kmer};
30
31const CHUNK_RECORDS: usize = 4096;
32
33#[derive(Debug, Clone)]
39pub struct CorrectConfig {
40 pub k: usize,
41 pub min_cov: i32,
42 pub win_multi_ec: i32,
43 pub qual_threshold: u8,
44 pub max_end_ext: i32,
45 pub max_path_diff: i32,
46 pub max_heap: usize,
47 pub drop_unique_kmer: bool,
48 pub discard_uncorrectable: bool,
49 pub fasta_out: bool,
50}
51
52impl Default for CorrectConfig {
53 fn default() -> Self {
54 Self {
55 k: 33,
56 min_cov: 3,
57 win_multi_ec: 10,
58 qual_threshold: 20,
59 max_end_ext: 5,
60 max_path_diff: 15,
61 max_heap: 100,
62 drop_unique_kmer: false,
63 discard_uncorrectable: false,
64 fasta_out: false,
65 }
66 }
67}
68
69#[derive(Debug, Default, Clone, Serialize)]
70pub struct CorrectReport {
71 pub reads_in: u64,
72 pub reads_out: u64,
73 pub reads_dropped: u64,
74 pub bases_in: u64,
75 pub bases_corrected: u64,
76}
77
78pub struct Pipeline<'cfg> {
79 pub cfg: &'cfg CorrectConfig,
80 pub compression: i32,
81}
82
83impl<'cfg> Pipeline<'cfg> {
84 #[must_use]
85 pub fn new(cfg: &'cfg CorrectConfig, compression: i32) -> Self {
86 Self { cfg, compression }
87 }
88
89 pub fn run(&self, input: &Path, output: &Path) -> Result<CorrectReport> {
93 if self.cfg.k < 11 || self.cfg.k > 63 || self.cfg.k.is_multiple_of(2) {
94 return Err(RsomicsError::ConfigError(format!(
95 "k must be odd and in 11..=63 (BFC_MAX_KMER 63), got {}",
96 self.cfg.k
97 )));
98 }
99 let ch = build_table(&[input], self.cfg)?;
100 let mode = ch.hist_mode(self.cfg.min_cov);
103 let mut reader = open_fastq(input)?;
104 let mut w = ChunkedWriter::create(output, self.compression)?;
105 let mut report = CorrectReport::default();
106 let cfg = self.cfg;
107
108 let mut chunk: Vec<OwnedRecord> = Vec::with_capacity(CHUNK_RECORDS);
109 loop {
110 chunk.clear();
111 while chunk.len() < CHUNK_RECORDS {
112 let Some(r) = reader.next() else { break };
113 chunk.push(r?);
114 }
115 if chunk.is_empty() {
116 break;
117 }
118 let out: Vec<(Option<OwnedRecord>, u64, u64)> = chunk
119 .par_drain(..)
120 .map(|rec| {
121 let bases_in = rec.seq.len() as u64;
122 if cfg.drop_unique_kmer && has_unique_kmer(cfg, &ch, &rec) {
123 return (None, bases_in, 0);
124 }
125 match correct_one(cfg, &ch, &rec, mode) {
126 Some((seq, qual)) => {
127 let corrected =
128 seq.iter().filter(|&&c| c.is_ascii_lowercase()).count() as u64;
129 (
130 Some(OwnedRecord {
131 id: rec.id,
132 seq,
133 qual: if cfg.fasta_out { Vec::new() } else { qual },
134 }),
135 bases_in,
136 corrected,
137 )
138 }
139 None if cfg.discard_uncorrectable => (None, bases_in, 0),
140 None => (Some(rec), bases_in, 0),
141 }
142 })
143 .collect();
144
145 for (rec, bin, bcorr) in out {
146 report.reads_in += 1;
147 report.bases_in += bin;
148 match rec {
149 Some(o) => {
150 report.bases_corrected += bcorr;
151 w.write_record(&o.id, &o.seq, &o.qual)?;
152 report.reads_out += 1;
153 }
154 None => report.reads_dropped += 1,
155 }
156 }
157 }
158 w.finalize()?;
159 Ok(report)
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use rsomics_seqio::OwnedRecord;
166
167 use super::{CorrectConfig, correct_one};
168 use crate::correct::{EcBase, SEQ_NT4, best_island, bfc_ec_greedy_k, seq_conv};
169 use crate::count::{CountTable, KmerMap};
170 use crate::kmer::{BfcKmer, bfc_hash_64};
171
172 #[test]
173 fn nt4_table_maps_acgt_and_n() {
174 assert_eq!(SEQ_NT4[b'A' as usize], 0);
175 assert_eq!(SEQ_NT4[b'T' as usize], 3);
176 assert_eq!(SEQ_NT4[b'g' as usize], 2);
177 assert_eq!(SEQ_NT4[b'N' as usize], 4);
178 }
179
180 #[test]
181 fn bfc_hash_64_is_invertible_shape() {
182 let mask = (1u64 << 33) - 1;
183 let a = bfc_hash_64(0x1234_5678 & mask, mask);
184 let b = bfc_hash_64(0x1234_5679 & mask, mask);
185 assert_ne!(a, b);
186 assert_eq!(a, a & mask);
187 }
188
189 #[test]
190 fn kmer_append_round_trips_forward_plane() {
191 let mut x = BfcKmer::NULL;
192 for c in [0u64, 1, 2, 3, 0, 3] {
193 x.append(5, c);
194 }
195 let mask = (1u64 << 5) - 1;
197 assert_eq!(x.x[0], x.x[0] & mask);
198 assert_eq!(x.x[1], x.x[1] & mask);
199 }
200
201 #[test]
202 fn best_island_picks_longest_solid_run() {
203 let mut s = vec![EcBase::default(); 10];
204 for (i, c) in s.iter_mut().enumerate() {
205 c.solid_end = (3..=7).contains(&i);
206 }
207 let r = best_island(3, &s);
208 assert!(r.is_some(), "a solid run must yield an island");
209 }
210
211 #[test]
212 fn no_solid_kmer_is_uncorrectable() {
213 let cfg = CorrectConfig::default();
214 let ch = CountTable {
215 k: cfg.k,
216 map: KmerMap::default(),
217 };
218 let rec = OwnedRecord {
219 id: b"r".to_vec(),
220 seq: b"ACGTACGTACGTACGTACGTACGTACGTACGTACGT".to_vec(),
221 qual: b"IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII".to_vec(),
222 };
223 assert!(correct_one(&cfg, &ch, &rec, ch.hist_mode(cfg.min_cov)).is_none());
224 }
225
226 #[test]
227 fn high_coverage_clean_read_is_unchanged() {
228 let cfg = CorrectConfig {
229 k: 11,
230 ..CorrectConfig::default()
231 };
232 let seq = b"ACGTACGTACGTGGGGCCCCAAAATTTTACGTACGT".to_vec();
233 let qual = vec![b'I'; seq.len()];
234 let rec = OwnedRecord {
235 id: b"r".to_vec(),
236 seq: seq.clone(),
237 qual: qual.clone(),
238 };
239 let mut ch = CountTable {
240 k: cfg.k,
241 map: KmerMap::default(),
242 };
243 for _ in 0..10 {
244 let s = seq_conv(&seq, &qual, cfg.qual_threshold);
245 let mut x = BfcKmer::NULL;
246 let mut l = 0;
247 for i in 0..s.len() {
248 if s[i].b < 4 {
249 x.append(cfg.k, u64::from(s[i].b));
250 l += 1;
251 if l >= cfg.k {
252 ch.add(&x, true);
253 }
254 } else {
255 l = 0;
256 x = BfcKmer::NULL;
257 }
258 }
259 }
260 let (out, _) = correct_one(&cfg, &ch, &rec, ch.hist_mode(cfg.min_cov))
261 .expect("clean high-cov read corrects");
262 assert!(
263 out.iter().all(u8::is_ascii_uppercase),
264 "a clean read must stay uppercase (no corrections): {:?}",
265 String::from_utf8_lossy(&out)
266 );
267 }
268
269 #[test]
273 fn greedy_rescue_picks_the_one_supported_substitution() {
274 let k = 11;
275 let truth = b"ACGTCAGTTGA"; let mut ch = CountTable {
277 k,
278 map: KmerMap::default(),
279 };
280 let mut tx = BfcKmer::NULL;
282 for &b in truth {
283 tx.append(k, u64::from(SEQ_NT4[b as usize]));
284 }
285 for _ in 0..20 {
286 ch.add(&tx, true);
287 }
288 let mut corrupt = BfcKmer::NULL;
292 for (i, &b) in truth.iter().enumerate() {
293 let base = if i == k - 1 {
294 3
295 } else {
296 u64::from(SEQ_NT4[b as usize])
297 };
298 corrupt.append(k, base);
299 }
300 let mode = ch.hist_mode(3);
301 let ec = bfc_ec_greedy_k(k, mode, &corrupt, &ch);
302 assert!(ec >= 0, "a confident single fix must be found");
303 assert_eq!(ec >> 2, 0, "the corrupted base is the 3'-most (d=0)");
304 assert_eq!(
305 (ec & 3) as u8,
306 SEQ_NT4[b'A' as usize],
307 "restored base must be the truth base A"
308 );
309 let empty = CountTable {
311 k,
312 map: KmerMap::default(),
313 };
314 assert_eq!(bfc_ec_greedy_k(k, 0, &corrupt, &empty), -1);
315 }
316}