1use anyhow::Context;
11use dashmap::DashMap;
12use slog::crit;
13use slog::{info, warn};
14
15use crate::diagnostics;
16use crate::prog_opts::GenPermitListOpts;
17use crate::utils as afutils;
18#[allow(unused_imports)]
19use ahash::{AHasher, RandomState};
20use bio_types::strand::Strand;
21use bstr::io::BufReadExt;
22use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
23use itertools::Itertools;
24use libradicl::exit_codes;
25use libradicl::rad_types::{self, RadType};
26use libradicl::BarcodeLookupMap;
27use libradicl::{chunk, record::AlevinFryReadRecord};
28use needletail::bitkmer::*;
29use num_format::{Locale, ToFormattedString};
30use serde::Serialize;
31use serde_json::json;
32use std::cmp;
33use std::collections::HashMap;
34use std::fs::File;
35use std::io::{BufRead, BufReader, Read};
36use std::io::{BufWriter, Write};
37use std::num::NonZeroUsize;
38use std::path::{Path, PathBuf};
39use std::sync::{atomic::Ordering, Arc};
40use std::time::Instant;
41
42#[derive(Clone, Debug, Serialize)]
43pub enum CellFilterMethod {
44 ForceCells(usize),
47 ExpectCells(usize),
50 ExplicitList(PathBuf),
54 UnfilteredExternalList(PathBuf, usize),
58 KneeFinding,
62}
63
64struct Point {
65 x: f64,
66 y: f64,
67}
68
69fn distance_to_line(p1: &Point, p2: &Point, q: &Point) -> f64 {
74 let x_0 = q.x;
75 let y_0 = q.y;
76
77 let x_1 = p1.x;
78 let y_1 = p1.y;
79
80 let x_2 = p2.x;
81 let y_2 = p2.y;
82
83 let numer = ((y_2 - y_1) * x_0 - (x_2 - x_1) * y_0 + x_2 * y_1 - y_2 * x_1).abs();
84 let denom = ((y_2 - y_1).powi(2) + (x_2 - x_1).powi(2)).sqrt();
85 assert!(denom > 0.0f64);
86 numer / denom
87}
88
89fn get_max_distance_index(sorted_frequencies: &[u64], is_cumulative: bool) -> usize {
105 assert!(sorted_frequencies.len() >= 2,
106 "ERROR: when attempting to find a knee-distance threshold, the list of putative cells is only of length {}. Cannot proceed. Please check the mapping rate.",
107 sorted_frequencies.len());
108 let first = sorted_frequencies
109 .first()
110 .expect("cannot process empty frequency list.");
111 let last = sorted_frequencies
112 .last()
113 .expect("cannot process empty frequency list.");
114
115 let max_x = sorted_frequencies.len() as f64;
117
118 let max_y = if is_cumulative {
121 *last as f64
122 } else {
123 *first as f64
124 };
125
126 let p1 = Point {
127 x: 0.0f64,
128 y: (*first as f64) / max_y,
129 };
130 let p2 = Point {
131 x: 1.0f64,
132 y: (*last as f64) / max_y,
133 };
134
135 let mut max_d: f64 = -1.0;
136 let mut max_ind: usize = 0;
137
138 for (ind, freq) in sorted_frequencies.iter().enumerate() {
139 let x = ind as f64 / max_x;
140 let y = *freq as f64 / max_y;
141 let q = Point { x, y };
142 let d = distance_to_line(&p1, &p2, &q);
143 if d >= max_d {
144 max_d = d;
145 max_ind = ind;
146 }
147 }
148 max_ind
149}
150
151fn get_knee(freq: &[u64], max_iterations: usize, log: &slog::Logger) -> usize {
159 let cfreq: Vec<u64> = freq
161 .iter()
162 .scan(0u64, |acc, &num| {
163 *acc += num;
164 Some(*acc)
165 })
166 .collect();
167 let mut prev_max = 0;
169 let mut max_idx = get_max_distance_index(&cfreq[..], true);
170
171 assert_ne!(max_idx, 0,
173 "get_knee determined a knee index of 0. This probably should not happen with valid input data.");
174
175 let mut iterations = 0;
176 let iter_slack = 5;
177 while max_idx - prev_max != 0 {
179 info!(log, "max_idx = {}", max_idx);
180 prev_max = max_idx;
181 iterations += 1;
182 if iterations % 10 == 0 {
183 info!(log, "knee-finding iter = {}", iterations);
184 }
185 if iterations > max_iterations {
186 break;
187 }
188 let last_idx = std::cmp::min(cfreq.len() - 1, max_idx * iter_slack);
189 max_idx = get_max_distance_index(&cfreq[0..last_idx], true);
190 assert_ne!(max_idx, 0,
191 "get_knee determined a knee index of 0. This probably should not happen with valid input data.");
192 }
193 max_idx
194}
195
196fn populate_unfiltered_barcode_map<T: Read>(
197 br: BufReader<T>,
198 first_bclen: &mut usize,
199) -> DashMap<u64, u64, ahash::RandomState> {
200 let s = ahash::RandomState::with_seeds(2u64, 7u64, 1u64, 8u64);
201 let hm = DashMap::with_hasher(s);
202
203 for l in br.byte_lines().flatten() {
207 if *first_bclen == 0 {
208 *first_bclen = l.len();
209 } else {
210 assert_eq!(
211 *first_bclen,
212 l.len(),
213 "found barcodes of different lengths {} and {}",
214 *first_bclen,
215 l.len()
216 );
217 }
218 if let Some((_, km, _)) =
219 needletail::bitkmer::BitNuclKmer::new(&l[..], l.len() as u8, false).next()
220 {
221 hm.insert(km.0, 0);
222 }
223 }
224 hm
225}
226
227#[allow(clippy::unnecessary_unwrap, clippy::too_many_arguments)]
228fn process_unfiltered(
229 hm: DashMap<u64, u64, ahash::RandomState>,
230 mut unmatched_bc: Vec<u64>,
231 file_tag_map: &rad_types::TagMap,
232 filter_meth: &CellFilterMethod,
233 expected_ori: Strand,
234 output_dir: &PathBuf,
235 version: &str,
236 max_ambiguity_read: usize,
237 velo_mode: bool,
238 cmdline: &str,
239 log: &slog::Logger,
240 gpl_opts: &GenPermitListOpts,
241) -> anyhow::Result<u64> {
242 let parent = std::path::Path::new(output_dir);
243 std::fs::create_dir_all(parent)
244 .with_context(|| format!("couldn't create directory path {}", parent.display()))?;
245
246 let min_freq = match filter_meth {
248 CellFilterMethod::UnfilteredExternalList(_, min_reads) => {
249 info!(log, "minimum num reads for barcode pass = {}", *min_reads);
250 *min_reads as u64
251 }
252 _ => {
253 unimplemented!();
254 }
255 };
256
257 let mut kept_bc = Vec::<u64>::new();
259
260 for mut kvp in hm.iter_mut() {
262 if *kvp.value() >= min_freq {
265 kept_bc.push(*kvp.key());
266 } else {
267 for _ in 0..*kvp.value() {
270 unmatched_bc.push(*kvp.key());
271 }
272 *kvp.value_mut() = 0u64;
274 }
275 }
276
277 hm.retain(|_, &mut v| v > 0);
279
280 let num_passing = kept_bc.len();
282 info!(
283 log,
284 "num_passing = {}",
285 num_passing.to_formatted_string(&Locale::en)
286 );
287
288 let barcode_tag = file_tag_map
289 .get("cblen")
290 .expect("tag map must contain cblen");
291 let barcode_len: u16 = barcode_tag.try_into()?;
292
293 let bcmap2 = BarcodeLookupMap::new(kept_bc, barcode_len as u32);
296 info!(
297 log,
298 "found {} cells with non-trivial number of reads by exact barcode match",
299 bcmap2.barcodes.len().to_formatted_string(&Locale::en)
300 );
301
302 let mut found_approx = 0usize;
308 let mut ambig_approx = 0usize;
309 let mut not_found = 0usize;
310
311 let start_unmatched_time = Instant::now();
312
313 unmatched_bc.sort_unstable();
314
315 let mut distinct_unmatched_bc = 0usize;
316 let mut distinct_recoverable_bc = 0usize;
317
318 let mut corrected_list = Vec::<(u64, u64)>::with_capacity(1_000_000);
320
321 for (count, ubc) in unmatched_bc.iter().dedup_with_count() {
322 match bcmap2.find_neighbors(*ubc, false) {
325 (Some(x), n) => {
327 let cbc = bcmap2.barcodes[x];
328 if cbc != *ubc && n == 1 {
331 if let Some(mut c) = hm.get_mut(&cbc) {
334 *c += count as u64;
335 corrected_list.push((*ubc, cbc));
336 }
337 found_approx += count;
339 distinct_recoverable_bc += 1;
340 }
341 if n > 1 {
345 ambig_approx += count;
346 }
347 }
348 (None, _) => {
352 not_found += count;
353 }
354 }
355 distinct_unmatched_bc += 1;
356 }
357 let unmatched_duration = start_unmatched_time.elapsed();
358 let num_corrected = distinct_recoverable_bc as u64;
359
360 info!(
361 log,
362 "There were {} distinct unmatched barcodes, and {} that can be recovered",
363 distinct_unmatched_bc.to_formatted_string(&Locale::en),
364 distinct_recoverable_bc.to_formatted_string(&Locale::en)
365 );
366 info!(
367 log,
368 "Matching unmatched barcodes to retained barcodes took {:?}", unmatched_duration
369 );
370 info!(log, "Of the unmatched barcodes\n============");
371 info!(
372 log,
373 "\t{} had exactly 1 single-edit neighbor in the retained list",
374 found_approx.to_formatted_string(&Locale::en)
375 );
376 info!(
377 log,
378 "\t{} had >1 single-edit neighbor in the retained list",
379 ambig_approx.to_formatted_string(&Locale::en)
380 );
381 info!(
382 log,
383 "\t{} had no neighbor in the retained list",
384 not_found.to_formatted_string(&Locale::en)
385 );
386
387 let parent = std::path::Path::new(output_dir);
388 std::fs::create_dir_all(parent).with_context(|| {
389 format!(
390 "couldn't create path to output directory {}",
391 parent.display()
392 )
393 })?;
394 let o_path = parent.join("permit_freq.bin");
395
396 let mut hm: HashMap<u64, u64, ahash::RandomState> = hm.into_iter().collect();
398 match afutils::write_permit_list_freq(&o_path, barcode_len, &hm) {
399 Ok(_) => {}
400 Err(error) => {
401 panic!("Error: {}", error);
402 }
403 };
404
405 for (k, v) in hm.iter_mut() {
416 *v = *k;
418 }
420 for (uncorrected, corrected) in corrected_list.iter() {
421 hm.insert(*uncorrected, *corrected);
422 }
423
424 let pm_path = parent.join("permit_map.bin");
425 let pm_file = std::fs::File::create(pm_path).context("could not create serialization file.")?;
426 let mut pm_writer = BufWriter::new(&pm_file);
427 bincode::serialize_into(&mut pm_writer, &hm)
428 .context("couldn't serialize permit list mapping.")?;
429
430 let meta_info = json!({
431 "velo_mode" : velo_mode,
432 "expected_ori" : *expected_ori.strand_symbol(),
433 "version_str" : version,
434 "max-ambig-record" : max_ambiguity_read,
435 "cmd" : cmdline,
436 "permit-list-type" : "unfiltered",
437 "gpl_options" : &gpl_opts
438 });
439
440 let m_path = parent.join("generate_permit_list.json");
441 let mut m_file = std::fs::File::create(m_path).context("could not create metadata file.")?;
442
443 let meta_info_string =
444 serde_json::to_string_pretty(&meta_info).context("could not format json.")?;
445 m_file
446 .write_all(meta_info_string.as_bytes())
447 .context("cannot write to generate_permit_list.json file")?;
448
449 info!(
450 log,
451 "total number of distinct corrected barcodes : {}",
452 num_corrected.to_formatted_string(&Locale::en)
453 );
454
455 Ok(num_corrected)
456}
457
458#[allow(clippy::unnecessary_unwrap, clippy::too_many_arguments)]
459fn process_filtered(
460 hm: DashMap<u64, u64, ahash::RandomState>,
461 file_tag_map: &rad_types::TagMap,
462 filter_meth: &CellFilterMethod,
463 expected_ori: Strand,
464 output_dir: &PathBuf,
465 version: &str,
466 max_ambiguity_read: usize,
467 velo_mode: bool,
468 cmdline: &str,
469 log: &slog::Logger,
470 gpl_opts: &GenPermitListOpts,
471) -> anyhow::Result<u64> {
472 let valid_bc: Vec<u64>;
473 let hm: HashMap<u64, u64, ahash::RandomState> = hm.into_iter().collect();
474 let mut freq: Vec<u64> = hm.values().cloned().collect();
475 freq.sort_unstable();
476 freq.reverse();
477
478 let barcode_tag = file_tag_map
479 .get("cblen")
480 .expect("tag map must contain cblen");
481 let barcode_len: u16 = barcode_tag.try_into()?;
482
483 match filter_meth {
485 CellFilterMethod::KneeFinding => {
486 let num_bc = get_knee(&freq[..], 100, log);
487 let min_freq = freq[num_bc];
488
489 valid_bc = permit_list_from_threshold(&hm, min_freq);
492 info!(
493 log,
494 "knee distance method resulted in the selection of {} permitted barcodes.",
495 valid_bc.len()
496 );
497 }
498 CellFilterMethod::ForceCells(top_k) => {
499 let num_bc = if freq.len() < *top_k {
500 freq.len() - 1
501 } else {
502 top_k - 1
503 };
504
505 let min_freq = freq[num_bc];
506
507 valid_bc = permit_list_from_threshold(&hm, min_freq);
510 }
511 CellFilterMethod::ExplicitList(valid_bc_file) => {
512 valid_bc = permit_list_from_file(valid_bc_file, barcode_len);
513 }
514 CellFilterMethod::ExpectCells(expected_num_cells) => {
515 let robust_quantile = 0.99f64;
516 let robust_div = 10.0f64;
517 let robust_ind = (*expected_num_cells as f64 * robust_quantile).round() as u64;
518 let ind = cmp::min(freq.len() - 1, robust_ind as usize);
520 let robust_freq = freq[ind];
521 let min_freq = std::cmp::max(1u64, (robust_freq as f64 / robust_div).round() as u64);
522 valid_bc = permit_list_from_threshold(&hm, min_freq);
523 }
524 CellFilterMethod::UnfilteredExternalList(_, _min_reads) => {
525 unimplemented!();
526 }
527 }
528
529 let full_permit_list =
532 afutils::generate_permitlist_map(&valid_bc, barcode_len as usize).unwrap();
533
534 let s2 = ahash::RandomState::with_seeds(2u64, 7u64, 1u64, 8u64);
535 let mut permitted_map = HashMap::with_capacity_and_hasher(valid_bc.len(), s2);
536
537 let mut num_corrected = 0;
538 for (k, v) in hm.iter() {
539 if let Some(&valid_key) = full_permit_list.get(k) {
540 *permitted_map.entry(valid_key).or_insert(0u64) += *v;
541 num_corrected += 1;
542 }
544 }
545
546 let parent = std::path::Path::new(output_dir);
547 std::fs::create_dir_all(parent).with_context(|| {
548 format!(
549 "failed to create path to output location {}",
550 parent.display()
551 )
552 })?;
553 let o_path = parent.join("permit_freq.bin");
554
555 match afutils::write_permit_list_freq(&o_path, barcode_len, &permitted_map) {
556 Ok(_) => {}
557 Err(error) => {
558 panic!("Error: {}", error);
559 }
560 };
561
562 let o_path = parent.join("all_freq.bin");
563
564 match afutils::write_permit_list_freq(&o_path, barcode_len, &hm) {
565 Ok(_) => {}
566 Err(error) => {
567 panic!("Error: {}", error);
568 }
569 };
570
571 let s_path = parent.join("permit_map.bin");
572 let s_file = std::fs::File::create(s_path).context("could not create serialization file.")?;
573 let mut s_writer = BufWriter::new(&s_file);
574 bincode::serialize_into(&mut s_writer, &full_permit_list)
575 .context("couldn't serialize permit list.")?;
576
577 let meta_info = json!({
578 "velo_mode" : velo_mode,
579 "expected_ori" : *expected_ori.strand_symbol(),
580 "version_str" : version,
581 "max-ambig-record" : max_ambiguity_read,
582 "cmd" : cmdline,
583 "permit-list-type" : "filtered",
584 "gpl_options" : &gpl_opts
585 });
586
587 let m_path = parent.join("generate_permit_list.json");
588 let mut m_file = std::fs::File::create(m_path).context("could not create metadata file.")?;
589
590 let meta_info_string =
591 serde_json::to_string_pretty(&meta_info).context("could not format json.")?;
592 m_file
593 .write_all(meta_info_string.as_bytes())
594 .context("cannot write to generate_permit_list.json file")?;
595
596 info!(
597 log,
598 "total number of distinct corrected barcodes : {}",
599 num_corrected.to_formatted_string(&Locale::en)
600 );
601
602 Ok(num_corrected)
603}
604
605pub fn generate_permit_list(gpl_opts: GenPermitListOpts) -> anyhow::Result<u64> {
611 let rad_dir = gpl_opts.input_dir;
612 let output_dir = gpl_opts.output_dir;
613 let filter_meth = gpl_opts.fmeth.clone();
614 let expected_ori = gpl_opts.expected_ori;
615 let version = gpl_opts.version;
616 let velo_mode = gpl_opts.velo_mode;
617 let cmdline = gpl_opts.cmdline;
618 let log = gpl_opts.log;
619
620 let i_dir = std::path::Path::new(&rad_dir);
621
622 if !i_dir.exists() {
625 crit!(
626 log,
627 "the input RAD path {} does not exist",
628 rad_dir.display()
629 );
630 anyhow::bail!("execution terminated because input RAD path does not exist.");
632 }
633
634 let mut first_bclen = 0usize;
635 let mut unfiltered_bc_counts = None;
636 if let CellFilterMethod::UnfilteredExternalList(fname, _) = &filter_meth {
637 let (reader, compression) = niffler::from_path(fname)
638 .with_context(|| format!("coult not open input file {}", fname.display()))?;
639 let br = BufReader::new(reader);
640
641 info!(
642 log,
643 "reading permit list from {}; inferred format {:#?}",
644 fname.display(),
645 compression
646 );
647
648 unfiltered_bc_counts = Some(populate_unfiltered_barcode_map(br, &mut first_bclen));
649 info!(
650 log,
651 "number of unfiltered bcs read = {}",
652 unfiltered_bc_counts
653 .as_ref()
654 .unwrap()
655 .len()
656 .to_formatted_string(&Locale::en)
657 );
658 }
659
660 let nworkers: usize = gpl_opts.threads;
661 let i_file = File::open(i_dir.join("map.rad")).context("could not open input rad file")?;
662 let ifile = BufReader::new(i_file);
663 let mut rad_reader = libradicl::readers::ParallelRadReader::<
664 AlevinFryReadRecord,
665 BufReader<File>,
666 >::new(ifile, NonZeroUsize::new(nworkers).unwrap());
667
668 let hdr = &rad_reader.prelude.hdr;
669 info!(
670 log,
671 "paired : {:?}, ref_count : {}, num_chunks : {}",
672 hdr.is_paired != 0,
673 hdr.ref_count.to_formatted_string(&Locale::en),
674 hdr.num_chunks.to_formatted_string(&Locale::en)
675 );
676 let num_chunks = hdr.num_chunks();
677
678 let fl_tags = &rad_reader.prelude.file_tags;
680 info!(log, "read {:?} file-level tags", fl_tags.tags.len());
681 let rl_tags = &rad_reader.prelude.read_tags;
683 info!(log, "read {:?} read-level tags", rl_tags.tags.len());
684
685 const BNAME: &str = "b";
687 const UNAME: &str = "u";
688
689 let mut bct: Option<RadType> = None;
690 let mut umit: Option<RadType> = None;
691
692 for rt in &rl_tags.tags {
693 if rt.name == BNAME || rt.name == UNAME {
695 if !rt.typeid.is_int_type() {
696 crit!(
697 log,
698 "currently only RAD types 1--4 are supported for 'b' and 'u' tags."
699 );
700 std::process::exit(exit_codes::EXIT_UNSUPPORTED_TAG_TYPE);
701 }
702
703 if rt.name == BNAME {
704 bct = Some(rt.typeid);
705 }
706 if rt.name == UNAME {
707 umit = Some(rt.typeid);
708 }
709 }
710 }
711 assert!(bct.is_some(), "barcode type tag must be present.");
712 assert!(umit.is_some(), "umi type tag must be present.");
713
714 let al_tags = &rad_reader.prelude.aln_tags;
716 info!(log, "read {:?} alignemnt-level tags", al_tags.tags.len());
717
718 {
719 let file_tag_map = &rad_reader.file_tag_map;
720 info!(log, "File-level tag values {:?}", file_tag_map);
721 }
722
723 let mut num_reads: usize = 0;
724
725 let s = ahash::RandomState::with_seeds(2u64, 7u64, 1u64, 8u64);
727 let hm = std::sync::Arc::new(DashMap::with_hasher(s));
728
729 let mut unmatched_bc: Vec<u64>;
732 let mut num_orientation_compat_reads = 0usize;
733 let mut max_ambiguity_read = 0usize;
734
735 let nc = num_chunks.expect("unknwon number of chunks").get() as u64;
736 let pbar = ProgressBar::new(nc);
737 pbar.set_style(
738 ProgressStyle::with_template(
739 "[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}",
740 )
741 .unwrap()
742 .progress_chars("##-"),
743 );
744 pbar.set_draw_target(ProgressDrawTarget::stderr_with_hz(5));
745 let cb = |_new_bytes: u64, new_chunks: u64| {
746 pbar.inc(new_chunks);
747 };
748
749 match filter_meth {
750 CellFilterMethod::UnfilteredExternalList(_, _min_reads) => {
751 unmatched_bc = Vec::with_capacity(10000000);
752 if unfiltered_bc_counts.is_some() {
754 let hmu = std::thread::scope(|s| {
755 let hmu = std::sync::Arc::new(unfiltered_bc_counts.unwrap());
756 let mut handles = Vec::<
757 std::thread::ScopedJoinHandle<(usize, usize, Vec<u64>, usize)>,
758 >::new();
759 for _ in 0..nworkers {
760 let rd = rad_reader.is_done();
761 let q = rad_reader.get_queue();
762 let hmu = hmu.clone();
763 let handle = s.spawn(move || {
764 let mut unmatched_bc = Vec::<u64>::new();
765 let mut max_ambiguity_read = 0usize;
766 let mut num_reads = 0;
767 let mut num_orientation_compat_reads = 0;
768 while !rd.load(Ordering::SeqCst) || !q.is_empty() {
769 while let Some(meta_chunk) = q.pop() {
770 for c in meta_chunk.iter() {
771 num_orientation_compat_reads +=
772 update_barcode_hist_unfiltered(
773 &hmu,
774 &mut unmatched_bc,
775 &mut max_ambiguity_read,
776 &c,
777 &expected_ori,
778 );
779 num_reads += c.reads.len();
780 }
781 }
782 }
783 (
784 num_reads,
785 num_orientation_compat_reads,
786 unmatched_bc,
787 max_ambiguity_read,
788 )
789 });
790 handles.push(handle);
791 }
792 let _ = rad_reader.start_chunk_parsing(Some(cb)); for handle in handles {
794 let (nr, nocr, ubc, mar) =
795 handle.join().expect("The parsing thread panicked");
796 num_reads += nr;
797 num_orientation_compat_reads += nocr;
798 unmatched_bc.extend_from_slice(&ubc);
799 max_ambiguity_read = max_ambiguity_read.max(mar);
800 }
801 pbar.finish_with_message("finished parsing RAD file\n");
802 std::sync::Arc::<DashMap<u64, u64, ahash::RandomState>>::into_inner(hmu)
804 });
805 info!(
820 log,
821 "observed {} reads ({} orientation consistent) in {} chunks --- max ambiguity read occurs in {} refs",
822 num_reads.to_formatted_string(&Locale::en),
823 num_orientation_compat_reads.to_formatted_string(&Locale::en),
824 num_chunks.expect("nonzero").to_formatted_string(&Locale::en),
825 max_ambiguity_read.to_formatted_string(&Locale::en)
826 );
827 let valid_thresh = 0.3f64;
828 match diagnostics::likely_valid_permit_list(
829 unmatched_bc.len(),
830 num_reads,
831 valid_thresh,
832 ) {
833 Ok(f) => {
834 info!(log,
835 "The percentage of mapped reads not matching a known barcode exactly is {:.3}%, which is < the warning threshold {:.3}%",
836 f * 100f64, valid_thresh * 100f64);
837 }
838 Err(e) => {
839 warn!(log, "{:?}", e);
840 }
841 }
842
843 process_unfiltered(
844 hmu.unwrap(),
845 unmatched_bc,
846 &rad_reader.file_tag_map,
847 &filter_meth,
848 expected_ori,
849 output_dir,
850 version,
851 max_ambiguity_read,
852 velo_mode,
853 cmdline,
854 log,
855 &gpl_opts,
856 )
857 } else {
858 Ok(0)
859 }
860 }
861 _ => {
862 let hm = std::thread::scope(|s| {
863 let mut handles =
864 Vec::<std::thread::ScopedJoinHandle<(usize, usize, usize)>>::new();
865 for _ in 0..nworkers {
866 let rd = rad_reader.is_done();
867 let q = rad_reader.get_queue();
868 let hm = hm.clone();
869 let handle = s.spawn(move || {
870 let mut max_ambiguity_read = 0usize;
871 let mut num_reads = 0;
872 while !rd.load(Ordering::SeqCst) || !q.is_empty() {
873 while let Some(meta_chunk) = q.pop() {
874 for c in meta_chunk.iter() {
875 update_barcode_hist(
876 &hm,
877 &mut max_ambiguity_read,
878 &c,
879 &expected_ori,
880 );
881 num_reads += c.reads.len();
882 }
883 }
884 }
885 (num_reads, num_orientation_compat_reads, max_ambiguity_read)
886 });
887 handles.push(handle);
888 }
889 let _ = rad_reader.start_chunk_parsing(Some(cb));
890 for handle in handles {
891 let (nr, nocr, mar) = handle.join().expect("The parsing thread panicked");
892 num_reads += nr;
893 num_orientation_compat_reads += nocr;
894 max_ambiguity_read = max_ambiguity_read.max(mar);
895 }
896 pbar.finish_with_message("finished parsing RAD file\n");
897 Arc::<DashMap<u64, u64, ahash::RandomState>>::into_inner(hm)
899 .expect("unique reference to DashMap")
900 });
901 info!(
902 log,
903 "observed {} reads in {} chunks --- max ambiguity read occurs in {} refs",
904 num_reads.to_formatted_string(&Locale::en),
905 num_chunks.unwrap().to_formatted_string(&Locale::en),
906 max_ambiguity_read.to_formatted_string(&Locale::en)
907 );
908 process_filtered(
909 hm,
910 &rad_reader.file_tag_map,
911 &filter_meth,
912 expected_ori,
913 output_dir,
914 version,
915 max_ambiguity_read,
916 velo_mode,
917 cmdline,
918 log,
919 &gpl_opts,
920 )
921 }
922 }
923
924 }
1052
1053pub fn update_barcode_hist_unfiltered(
1054 hist: &DashMap<u64, u64, ahash::RandomState>,
1055 unmatched_bc: &mut Vec<u64>,
1056 max_ambiguity_read: &mut usize,
1057 chunk: &chunk::Chunk<AlevinFryReadRecord>,
1058 expected_ori: &Strand,
1059) -> usize {
1060 let mut num_strand_compat_reads = 0usize;
1061 match expected_ori {
1062 Strand::Unknown => {
1063 for r in &chunk.reads {
1064 num_strand_compat_reads += 1;
1065 *max_ambiguity_read = r.refs.len().max(*max_ambiguity_read);
1066 match hist.get_mut(&r.bc) {
1069 Some(mut c) => *c += 1,
1071 None => {
1073 unmatched_bc.push(r.bc);
1074 }
1075 }
1076 }
1077 }
1078 Strand::Forward => {
1079 for r in &chunk.reads {
1080 if r.dirs.iter().any(|&x| x) {
1081 num_strand_compat_reads += 1;
1082 *max_ambiguity_read = r.refs.len().max(*max_ambiguity_read);
1083 match hist.get_mut(&r.bc) {
1086 Some(mut c) => *c += 1,
1088 None => {
1090 unmatched_bc.push(r.bc);
1091 }
1092 }
1093 }
1094 }
1095 }
1096 Strand::Reverse => {
1097 for r in &chunk.reads {
1098 if r.dirs.iter().any(|&x| !x) {
1099 num_strand_compat_reads += 1;
1100 *max_ambiguity_read = r.refs.len().max(*max_ambiguity_read);
1101 match hist.get_mut(&r.bc) {
1104 Some(mut c) => *c += 1,
1106 None => {
1108 unmatched_bc.push(r.bc);
1109 }
1110 }
1111 }
1112 }
1113 }
1114 }
1115 num_strand_compat_reads
1116}
1117
1118pub fn update_barcode_hist(
1119 hist: &DashMap<u64, u64, ahash::RandomState>,
1120 max_ambiguity_read: &mut usize,
1121 chunk: &chunk::Chunk<AlevinFryReadRecord>,
1122 expected_ori: &Strand,
1123) {
1124 match expected_ori {
1125 Strand::Unknown => {
1126 for r in &chunk.reads {
1127 *max_ambiguity_read = r.refs.len().max(*max_ambiguity_read);
1128 *hist.entry(r.bc).or_insert(0) += 1;
1129 }
1130 }
1131 Strand::Forward => {
1132 for r in &chunk.reads {
1133 if r.dirs.iter().any(|&x| x) {
1134 *max_ambiguity_read = r.refs.len().max(*max_ambiguity_read);
1135 *hist.entry(r.bc).or_insert(0) += 1;
1136 }
1137 }
1138 }
1139 Strand::Reverse => {
1140 for r in &chunk.reads {
1141 if r.dirs.iter().any(|&x| !x) {
1142 *max_ambiguity_read = r.refs.len().max(*max_ambiguity_read);
1143 *hist.entry(r.bc).or_insert(0) += 1;
1144 }
1145 }
1146 }
1147 }
1148}
1149
1150pub fn permit_list_from_threshold(
1151 hist: &HashMap<u64, u64, ahash::RandomState>,
1152 min_freq: u64,
1153) -> Vec<u64> {
1154 let valid_bc: Vec<u64> = hist
1155 .iter()
1156 .filter_map(|(k, v)| if v >= &min_freq { Some(*k) } else { None })
1157 .collect();
1158 valid_bc
1159}
1160
1161pub fn permit_list_from_file<P>(ifile: P, bclen: u16) -> Vec<u64>
1162where
1163 P: AsRef<Path>,
1164{
1165 let f = File::open(ifile).expect("couldn't open input barcode file.");
1166 let br = BufReader::new(f);
1167 let mut bc = Vec::<u64>::with_capacity(10_000);
1168
1169 for l in br.lines() {
1170 let line = l.expect("couldn't read line from barcode file.");
1171 let mut bnk = BitNuclKmer::new(line.as_bytes(), bclen as u8, false);
1172 let (_, k, _) = bnk.next().expect("can't extract kmer");
1173 bc.push(k.0);
1174 }
1175 bc
1176}