alevin_fry/
cellfilter.rs

1/*
2 * Copyright (c) 2020-2024 COMBINE-lab.
3 *
4 * This file is part of alevin-fry
5 * (see https://www.github.com/COMBINE-lab/alevin-fry).
6 *
7 * License: 3-clause BSD, see https://opensource.org/licenses/BSD-3-Clause
8 */
9
10use anyhow::Context;
11use dashmap::DashMap;
12use slog::crit;
13use slog::{info, warn};
14
15use crate::diagnostics;
16use crate::prog_opts::GenPermitListOpts;
17use crate::utils as afutils;
18#[allow(unused_imports)]
19use ahash::{AHasher, RandomState};
20use bio_types::strand::Strand;
21use bstr::io::BufReadExt;
22use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
23use itertools::Itertools;
24use libradicl::exit_codes;
25use libradicl::rad_types::{self, RadType};
26use libradicl::BarcodeLookupMap;
27use libradicl::{chunk, record::AlevinFryReadRecord};
28use needletail::bitkmer::*;
29use num_format::{Locale, ToFormattedString};
30use serde::Serialize;
31use serde_json::json;
32use std::cmp;
33use std::collections::HashMap;
34use std::fs::File;
35use std::io::{BufRead, BufReader, Read};
36use std::io::{BufWriter, Write};
37use std::num::NonZeroUsize;
38use std::path::{Path, PathBuf};
39use std::sync::{atomic::Ordering, Arc};
40use std::time::Instant;
41
42#[derive(Clone, Debug, Serialize)]
43pub enum CellFilterMethod {
44    // cut off at this cell in
45    // the frequency sorted list
46    ForceCells(usize),
47    // use this cell as a hint in
48    // the frequency sorted list
49    ExpectCells(usize),
50    // correct all cells in an
51    // edit distance of 1 of these
52    // barcodes
53    ExplicitList(PathBuf),
54    // barcodes will be provided in the
55    // form of an *unfiltered* external
56    // permit list
57    UnfilteredExternalList(PathBuf, usize),
58    // use the distance method to
59    // automatically find the knee
60    // in the curve
61    KneeFinding,
62}
63
64struct Point {
65    x: f64,
66    y: f64,
67}
68
69/// compute the distance between the query point `Q`
70/// and the line defined by points `P1` and `P2`.  The
71/// formula used here is taken from :
72/// https://en.wikipedia.org/wiki/Distance_from_a_point_to_a_line
73fn distance_to_line(p1: &Point, p2: &Point, q: &Point) -> f64 {
74    let x_0 = q.x;
75    let y_0 = q.y;
76
77    let x_1 = p1.x;
78    let y_1 = p1.y;
79
80    let x_2 = p2.x;
81    let y_2 = p2.y;
82
83    let numer = ((y_2 - y_1) * x_0 - (x_2 - x_1) * y_0 + x_2 * y_1 - y_2 * x_1).abs();
84    let denom = ((y_2 - y_1).powi(2) + (x_2 - x_1).powi(2)).sqrt();
85    assert!(denom > 0.0f64);
86    numer / denom
87}
88
89/// This method is a implementation of the distance method
90/// used in umi_tools :
91///     Smith, Tom, Andreas Heger, and Ian Sudbery.
92///     "UMI-tools: modeling sequencing errors in Unique Molecular Identifiers to improve quantification accuracy."
93///     Genome research 27.3 (2017): 491-499.
94///
95/// though this is a re-implementation and uses the same basic algorithm
96/// the result may not be identical.
97///
98/// Given a list of cumulative frequencies, where the index of a
99/// point is interpreted as its x-coordinate and its frequency
100/// is interpreted as its y-coordinate, define the line L based on
101/// `sorted_frequences.first()` and `sorted_frequencies.last()`.  Compute
102/// the distance of each point from L, and return the index of the point
103/// having the maximum distance.
104fn get_max_distance_index(sorted_frequencies: &[u64], is_cumulative: bool) -> usize {
105    assert!(sorted_frequencies.len() >= 2,
106        "ERROR: when attempting to find a knee-distance threshold, the list of putative cells is only of length {}. Cannot proceed. Please check the mapping rate.",
107        sorted_frequencies.len());
108    let first = sorted_frequencies
109        .first()
110        .expect("cannot process empty frequency list.");
111    let last = sorted_frequencies
112        .last()
113        .expect("cannot process empty frequency list.");
114
115    // length as a float
116    let max_x = sorted_frequencies.len() as f64;
117
118    // if the distribution is cumulative, then the smallest y coordinate is
119    // f, otherewise it is l
120    let max_y = if is_cumulative {
121        *last as f64
122    } else {
123        *first as f64
124    };
125
126    let p1 = Point {
127        x: 0.0f64,
128        y: (*first as f64) / max_y,
129    };
130    let p2 = Point {
131        x: 1.0f64,
132        y: (*last as f64) / max_y,
133    };
134
135    let mut max_d: f64 = -1.0;
136    let mut max_ind: usize = 0;
137
138    for (ind, freq) in sorted_frequencies.iter().enumerate() {
139        let x = ind as f64 / max_x;
140        let y = *freq as f64 / max_y;
141        let q = Point { x, y };
142        let d = distance_to_line(&p1, &p2, &q);
143        if d >= max_d {
144            max_d = d;
145            max_ind = ind;
146        }
147    }
148    max_ind
149}
150
151/// Get the knee of the cure using the `distance` method as described
152/// in the [UMI-tools documentation](https://github.com/CGATOxford/UMI-tools).
153/// This method takes a reverse-sorted (sorted in descending order) llist of
154/// frequencies, and a maximum number of iterations to run the algorithm.  It
155/// returns the point on the CDF of the reverse-sorted frequency vector that is
156/// farthest from the line defined by the end-points.  The algorithm is taken from
157/// [here](https://github.com/CGATOxford/UMI-tools/blob/master/umi_tools/whitelist_methods.py#L248).
158fn get_knee(freq: &[u64], max_iterations: usize, log: &slog::Logger) -> usize {
159    // get the cumulative frequency from the frequency
160    let cfreq: Vec<u64> = freq
161        .iter()
162        .scan(0u64, |acc, &num| {
163            *acc += num;
164            Some(*acc)
165        })
166        .collect();
167    // get the guess about the max distance point
168    let mut prev_max = 0;
169    let mut max_idx = get_max_distance_index(&cfreq[..], true);
170
171    // if we think we should include no cells, something is probably wrong.
172    assert_ne!(max_idx, 0,
173	       "get_knee determined a knee index of 0. This probably should not happen with valid input data.");
174
175    let mut iterations = 0;
176    let iter_slack = 5;
177    // while our algorithm hasn't converged
178    while max_idx - prev_max != 0 {
179        info!(log, "max_idx = {}", max_idx);
180        prev_max = max_idx;
181        iterations += 1;
182        if iterations % 10 == 0 {
183            info!(log, "knee-finding iter = {}", iterations);
184        }
185        if iterations > max_iterations {
186            break;
187        }
188        let last_idx = std::cmp::min(cfreq.len() - 1, max_idx * iter_slack);
189        max_idx = get_max_distance_index(&cfreq[0..last_idx], true);
190        assert_ne!(max_idx, 0,
191	       "get_knee determined a knee index of 0. This probably should not happen with valid input data.");
192    }
193    max_idx
194}
195
196fn populate_unfiltered_barcode_map<T: Read>(
197    br: BufReader<T>,
198    first_bclen: &mut usize,
199) -> DashMap<u64, u64, ahash::RandomState> {
200    let s = ahash::RandomState::with_seeds(2u64, 7u64, 1u64, 8u64);
201    let hm = DashMap::with_hasher(s);
202
203    // read through the external unfiltered barcode list
204    // and generate a vector of encoded barcodes
205    // let mut kv = Vec::<u64>::new();
206    for l in br.byte_lines().flatten() {
207        if *first_bclen == 0 {
208            *first_bclen = l.len();
209        } else {
210            assert_eq!(
211                *first_bclen,
212                l.len(),
213                "found barcodes of different lengths {} and {}",
214                *first_bclen,
215                l.len()
216            );
217        }
218        if let Some((_, km, _)) =
219            needletail::bitkmer::BitNuclKmer::new(&l[..], l.len() as u8, false).next()
220        {
221            hm.insert(km.0, 0);
222        }
223    }
224    hm
225}
226
227#[allow(clippy::unnecessary_unwrap, clippy::too_many_arguments)]
228fn process_unfiltered(
229    hm: DashMap<u64, u64, ahash::RandomState>,
230    mut unmatched_bc: Vec<u64>,
231    file_tag_map: &rad_types::TagMap,
232    filter_meth: &CellFilterMethod,
233    expected_ori: Strand,
234    output_dir: &PathBuf,
235    version: &str,
236    max_ambiguity_read: usize,
237    velo_mode: bool,
238    cmdline: &str,
239    log: &slog::Logger,
240    gpl_opts: &GenPermitListOpts,
241) -> anyhow::Result<u64> {
242    let parent = std::path::Path::new(output_dir);
243    std::fs::create_dir_all(parent)
244        .with_context(|| format!("couldn't create directory path {}", parent.display()))?;
245
246    // the smallest number of reads we'll allow per barcode
247    let min_freq = match filter_meth {
248        CellFilterMethod::UnfilteredExternalList(_, min_reads) => {
249            info!(log, "minimum num reads for barcode pass = {}", *min_reads);
250            *min_reads as u64
251        }
252        _ => {
253            unimplemented!();
254        }
255    };
256
257    // the set of barcodes we'll keep
258    let mut kept_bc = Vec::<u64>::new();
259
260    // iterate over the count map
261    for mut kvp in hm.iter_mut() {
262        // if this satisfies our requirement for the minimum count
263        // then keep this barcode
264        if *kvp.value() >= min_freq {
265            kept_bc.push(*kvp.key());
266        } else {
267            // otherwise, we have to add this barcode's
268            // counts to our unmatched list
269            for _ in 0..*kvp.value() {
270                unmatched_bc.push(*kvp.key());
271            }
272            // and then reset the counter for this barcode to 0
273            *kvp.value_mut() = 0u64;
274        }
275    }
276
277    // drop the absent barcodes from hm
278    hm.retain(|_, &mut v| v > 0);
279
280    // how many we will keep
281    let num_passing = kept_bc.len();
282    info!(
283        log,
284        "num_passing = {}",
285        num_passing.to_formatted_string(&Locale::en)
286    );
287
288    let barcode_tag = file_tag_map
289        .get("cblen")
290        .expect("tag map must contain cblen");
291    let barcode_len: u16 = barcode_tag.try_into()?;
292
293    // now, we create a second barcode map with just the barcodes
294    // for cells we will keep / rescue.
295    let bcmap2 = BarcodeLookupMap::new(kept_bc, barcode_len as u32);
296    info!(
297        log,
298        "found {} cells with non-trivial number of reads by exact barcode match",
299        bcmap2.barcodes.len().to_formatted_string(&Locale::en)
300    );
301
302    // finally, we'll go through the set of unmatched barcodes
303    // and try to rescue those that have a *unique* neighbor in the
304    // list of retained barcodes.
305
306    //let mut found_exact = 0usize;
307    let mut found_approx = 0usize;
308    let mut ambig_approx = 0usize;
309    let mut not_found = 0usize;
310
311    let start_unmatched_time = Instant::now();
312
313    unmatched_bc.sort_unstable();
314
315    let mut distinct_unmatched_bc = 0usize;
316    let mut distinct_recoverable_bc = 0usize;
317
318    // mapping the uncorrected barcode to what it corrects to
319    let mut corrected_list = Vec::<(u64, u64)>::with_capacity(1_000_000);
320
321    for (count, ubc) in unmatched_bc.iter().dedup_with_count() {
322        // try to find the unmatched barcode, but
323        // look up to 1 edit away
324        match bcmap2.find_neighbors(*ubc, false) {
325            // if we have a match
326            (Some(x), n) => {
327                let cbc = bcmap2.barcodes[x];
328                // if the uncorrected barcode had a
329                // single, unique retained neighbor
330                if cbc != *ubc && n == 1 {
331                    // then increment the count of this
332                    // barcode by 1 (because we'll correct to it)
333                    if let Some(mut c) = hm.get_mut(&cbc) {
334                        *c += count as u64;
335                        corrected_list.push((*ubc, cbc));
336                    }
337                    // this counts as an approximate find
338                    found_approx += count;
339                    distinct_recoverable_bc += 1;
340                }
341                // if we had > 1 single-mismatch neighbor
342                // then don't keep the barcode, but remember
343                // the count of such events
344                if n > 1 {
345                    ambig_approx += count;
346                }
347            }
348            // if we had no single-mismatch neighbor
349            // then this barcode is not_found and gets
350            // dropped.
351            (None, _) => {
352                not_found += count;
353            }
354        }
355        distinct_unmatched_bc += 1;
356    }
357    let unmatched_duration = start_unmatched_time.elapsed();
358    let num_corrected = distinct_recoverable_bc as u64;
359
360    info!(
361        log,
362        "There were {} distinct unmatched barcodes, and {} that can be recovered",
363        distinct_unmatched_bc.to_formatted_string(&Locale::en),
364        distinct_recoverable_bc.to_formatted_string(&Locale::en)
365    );
366    info!(
367        log,
368        "Matching unmatched barcodes to retained barcodes took {:?}", unmatched_duration
369    );
370    info!(log, "Of the unmatched barcodes\n============");
371    info!(
372        log,
373        "\t{} had exactly 1 single-edit neighbor in the retained list",
374        found_approx.to_formatted_string(&Locale::en)
375    );
376    info!(
377        log,
378        "\t{} had >1 single-edit neighbor in the retained list",
379        ambig_approx.to_formatted_string(&Locale::en)
380    );
381    info!(
382        log,
383        "\t{} had no neighbor in the retained list",
384        not_found.to_formatted_string(&Locale::en)
385    );
386
387    let parent = std::path::Path::new(output_dir);
388    std::fs::create_dir_all(parent).with_context(|| {
389        format!(
390            "couldn't create path to output directory {}",
391            parent.display()
392        )
393    })?;
394    let o_path = parent.join("permit_freq.bin");
395
396    // convert the DashMap to a HashMap
397    let mut hm: HashMap<u64, u64, ahash::RandomState> = hm.into_iter().collect();
398    match afutils::write_permit_list_freq(&o_path, barcode_len, &hm) {
399        Ok(_) => {}
400        Err(error) => {
401            panic!("Error: {}", error);
402        }
403    };
404
405    /*
406    // don't need this right now
407    let s_path = parent.join("bcmap.bin");
408    let s_file = std::fs::File::create(&s_path).expect("could not create serialization file.");
409    let mut s_writer = BufWriter::new(&s_file);
410    bincode::serialize_into(&mut s_writer, &bcmap2).expect("couldn't serialize barcode list.");
411    */
412
413    // now that we are done with using hm to count, we can repurpose it as
414    // the correction map.
415    for (k, v) in hm.iter_mut() {
416        // each present barcode corrects to itself
417        *v = *k;
418        //*kvp.value_mut() = *kvp.key();
419    }
420    for (uncorrected, corrected) in corrected_list.iter() {
421        hm.insert(*uncorrected, *corrected);
422    }
423
424    let pm_path = parent.join("permit_map.bin");
425    let pm_file = std::fs::File::create(pm_path).context("could not create serialization file.")?;
426    let mut pm_writer = BufWriter::new(&pm_file);
427    bincode::serialize_into(&mut pm_writer, &hm)
428        .context("couldn't serialize permit list mapping.")?;
429
430    let meta_info = json!({
431    "velo_mode" : velo_mode,
432    "expected_ori" : *expected_ori.strand_symbol(),
433    "version_str" : version,
434    "max-ambig-record" : max_ambiguity_read,
435    "cmd" : cmdline,
436    "permit-list-type" : "unfiltered",
437    "gpl_options" : &gpl_opts
438    });
439
440    let m_path = parent.join("generate_permit_list.json");
441    let mut m_file = std::fs::File::create(m_path).context("could not create metadata file.")?;
442
443    let meta_info_string =
444        serde_json::to_string_pretty(&meta_info).context("could not format json.")?;
445    m_file
446        .write_all(meta_info_string.as_bytes())
447        .context("cannot write to generate_permit_list.json file")?;
448
449    info!(
450        log,
451        "total number of distinct corrected barcodes : {}",
452        num_corrected.to_formatted_string(&Locale::en)
453    );
454
455    Ok(num_corrected)
456}
457
458#[allow(clippy::unnecessary_unwrap, clippy::too_many_arguments)]
459fn process_filtered(
460    hm: DashMap<u64, u64, ahash::RandomState>,
461    file_tag_map: &rad_types::TagMap,
462    filter_meth: &CellFilterMethod,
463    expected_ori: Strand,
464    output_dir: &PathBuf,
465    version: &str,
466    max_ambiguity_read: usize,
467    velo_mode: bool,
468    cmdline: &str,
469    log: &slog::Logger,
470    gpl_opts: &GenPermitListOpts,
471) -> anyhow::Result<u64> {
472    let valid_bc: Vec<u64>;
473    let hm: HashMap<u64, u64, ahash::RandomState> = hm.into_iter().collect();
474    let mut freq: Vec<u64> = hm.values().cloned().collect();
475    freq.sort_unstable();
476    freq.reverse();
477
478    let barcode_tag = file_tag_map
479        .get("cblen")
480        .expect("tag map must contain cblen");
481    let barcode_len: u16 = barcode_tag.try_into()?;
482
483    // select from among supported filter methods
484    match filter_meth {
485        CellFilterMethod::KneeFinding => {
486            let num_bc = get_knee(&freq[..], 100, log);
487            let min_freq = freq[num_bc];
488
489            // collect all of the barcodes that have a frequency
490            // >= to min_thresh.
491            valid_bc = permit_list_from_threshold(&hm, min_freq);
492            info!(
493                log,
494                "knee distance method resulted in the selection of {} permitted barcodes.",
495                valid_bc.len()
496            );
497        }
498        CellFilterMethod::ForceCells(top_k) => {
499            let num_bc = if freq.len() < *top_k {
500                freq.len() - 1
501            } else {
502                top_k - 1
503            };
504
505            let min_freq = freq[num_bc];
506
507            // collect all of the barcodes that have a frequency
508            // >= to min_thresh.
509            valid_bc = permit_list_from_threshold(&hm, min_freq);
510        }
511        CellFilterMethod::ExplicitList(valid_bc_file) => {
512            valid_bc = permit_list_from_file(valid_bc_file, barcode_len);
513        }
514        CellFilterMethod::ExpectCells(expected_num_cells) => {
515            let robust_quantile = 0.99f64;
516            let robust_div = 10.0f64;
517            let robust_ind = (*expected_num_cells as f64 * robust_quantile).round() as u64;
518            // the robust ind must be valid
519            let ind = cmp::min(freq.len() - 1, robust_ind as usize);
520            let robust_freq = freq[ind];
521            let min_freq = std::cmp::max(1u64, (robust_freq as f64 / robust_div).round() as u64);
522            valid_bc = permit_list_from_threshold(&hm, min_freq);
523        }
524        CellFilterMethod::UnfilteredExternalList(_, _min_reads) => {
525            unimplemented!();
526        }
527    }
528
529    // generate the map from each permitted barcode to all barcodes within
530    // edit distance 1 of it.
531    let full_permit_list =
532        afutils::generate_permitlist_map(&valid_bc, barcode_len as usize).unwrap();
533
534    let s2 = ahash::RandomState::with_seeds(2u64, 7u64, 1u64, 8u64);
535    let mut permitted_map = HashMap::with_capacity_and_hasher(valid_bc.len(), s2);
536
537    let mut num_corrected = 0;
538    for (k, v) in hm.iter() {
539        if let Some(&valid_key) = full_permit_list.get(k) {
540            *permitted_map.entry(valid_key).or_insert(0u64) += *v;
541            num_corrected += 1;
542            //println!("{} was a neighbor of {}, with count {}", k, valid_key, v);
543        }
544    }
545
546    let parent = std::path::Path::new(output_dir);
547    std::fs::create_dir_all(parent).with_context(|| {
548        format!(
549            "failed to create path to output location {}",
550            parent.display()
551        )
552    })?;
553    let o_path = parent.join("permit_freq.bin");
554
555    match afutils::write_permit_list_freq(&o_path, barcode_len, &permitted_map) {
556        Ok(_) => {}
557        Err(error) => {
558            panic!("Error: {}", error);
559        }
560    };
561
562    let o_path = parent.join("all_freq.bin");
563
564    match afutils::write_permit_list_freq(&o_path, barcode_len, &hm) {
565        Ok(_) => {}
566        Err(error) => {
567            panic!("Error: {}", error);
568        }
569    };
570
571    let s_path = parent.join("permit_map.bin");
572    let s_file = std::fs::File::create(s_path).context("could not create serialization file.")?;
573    let mut s_writer = BufWriter::new(&s_file);
574    bincode::serialize_into(&mut s_writer, &full_permit_list)
575        .context("couldn't serialize permit list.")?;
576
577    let meta_info = json!({
578    "velo_mode" : velo_mode,
579    "expected_ori" : *expected_ori.strand_symbol(),
580    "version_str" : version,
581    "max-ambig-record" : max_ambiguity_read,
582    "cmd" : cmdline,
583    "permit-list-type" : "filtered",
584    "gpl_options" : &gpl_opts
585    });
586
587    let m_path = parent.join("generate_permit_list.json");
588    let mut m_file = std::fs::File::create(m_path).context("could not create metadata file.")?;
589
590    let meta_info_string =
591        serde_json::to_string_pretty(&meta_info).context("could not format json.")?;
592    m_file
593        .write_all(meta_info_string.as_bytes())
594        .context("cannot write to generate_permit_list.json file")?;
595
596    info!(
597        log,
598        "total number of distinct corrected barcodes : {}",
599        num_corrected.to_formatted_string(&Locale::en)
600    );
601
602    Ok(num_corrected)
603}
604
605/// Given the input RAD file `input_file`, compute
606/// and output (in `output_dir`) the list of valid
607/// (i.e. "permitted") barcode values, as well as
608/// a map from each correctable barcode to the
609/// permitted barcode to which it maps.
610pub fn generate_permit_list(gpl_opts: GenPermitListOpts) -> anyhow::Result<u64> {
611    let rad_dir = gpl_opts.input_dir;
612    let output_dir = gpl_opts.output_dir;
613    let filter_meth = gpl_opts.fmeth.clone();
614    let expected_ori = gpl_opts.expected_ori;
615    let version = gpl_opts.version;
616    let velo_mode = gpl_opts.velo_mode;
617    let cmdline = gpl_opts.cmdline;
618    let log = gpl_opts.log;
619
620    let i_dir = std::path::Path::new(&rad_dir);
621
622    // should we assume this condition was already checked
623    // during parsing?
624    if !i_dir.exists() {
625        crit!(
626            log,
627            "the input RAD path {} does not exist",
628            rad_dir.display()
629        );
630        // std::process::exit(1);
631        anyhow::bail!("execution terminated because input RAD path does not exist.");
632    }
633
634    let mut first_bclen = 0usize;
635    let mut unfiltered_bc_counts = None;
636    if let CellFilterMethod::UnfilteredExternalList(fname, _) = &filter_meth {
637        let (reader, compression) = niffler::from_path(fname)
638            .with_context(|| format!("coult not open input file {}", fname.display()))?;
639        let br = BufReader::new(reader);
640
641        info!(
642            log,
643            "reading permit list from {}; inferred format {:#?}",
644            fname.display(),
645            compression
646        );
647
648        unfiltered_bc_counts = Some(populate_unfiltered_barcode_map(br, &mut first_bclen));
649        info!(
650            log,
651            "number of unfiltered bcs read = {}",
652            unfiltered_bc_counts
653                .as_ref()
654                .unwrap()
655                .len()
656                .to_formatted_string(&Locale::en)
657        );
658    }
659
660    let nworkers: usize = gpl_opts.threads;
661    let i_file = File::open(i_dir.join("map.rad")).context("could not open input rad file")?;
662    let ifile = BufReader::new(i_file);
663    let mut rad_reader = libradicl::readers::ParallelRadReader::<
664        AlevinFryReadRecord,
665        BufReader<File>,
666    >::new(ifile, NonZeroUsize::new(nworkers).unwrap());
667
668    let hdr = &rad_reader.prelude.hdr;
669    info!(
670        log,
671        "paired : {:?}, ref_count : {}, num_chunks : {}",
672        hdr.is_paired != 0,
673        hdr.ref_count.to_formatted_string(&Locale::en),
674        hdr.num_chunks.to_formatted_string(&Locale::en)
675    );
676    let num_chunks = hdr.num_chunks();
677
678    // file-level
679    let fl_tags = &rad_reader.prelude.file_tags;
680    info!(log, "read {:?} file-level tags", fl_tags.tags.len());
681    // read-level
682    let rl_tags = &rad_reader.prelude.read_tags;
683    info!(log, "read {:?} read-level tags", rl_tags.tags.len());
684
685    // right now, we only handle BC and UMI types of U8—U64, so validate that
686    const BNAME: &str = "b";
687    const UNAME: &str = "u";
688
689    let mut bct: Option<RadType> = None;
690    let mut umit: Option<RadType> = None;
691
692    for rt in &rl_tags.tags {
693        // if this is one of our tags
694        if rt.name == BNAME || rt.name == UNAME {
695            if !rt.typeid.is_int_type() {
696                crit!(
697                    log,
698                    "currently only RAD types 1--4 are supported for 'b' and 'u' tags."
699                );
700                std::process::exit(exit_codes::EXIT_UNSUPPORTED_TAG_TYPE);
701            }
702
703            if rt.name == BNAME {
704                bct = Some(rt.typeid);
705            }
706            if rt.name == UNAME {
707                umit = Some(rt.typeid);
708            }
709        }
710    }
711    assert!(bct.is_some(), "barcode type tag must be present.");
712    assert!(umit.is_some(), "umi type tag must be present.");
713
714    // alignment-level
715    let al_tags = &rad_reader.prelude.aln_tags;
716    info!(log, "read {:?} alignemnt-level tags", al_tags.tags.len());
717
718    {
719        let file_tag_map = &rad_reader.file_tag_map;
720        info!(log, "File-level tag values {:?}", file_tag_map);
721    }
722
723    let mut num_reads: usize = 0;
724
725    // if dealing with filtered type
726    let s = ahash::RandomState::with_seeds(2u64, 7u64, 1u64, 8u64);
727    let hm = std::sync::Arc::new(DashMap::with_hasher(s));
728
729    // if dealing with the unfiltered type
730    // the set of barcodes that are not an exact match for any known barcodes
731    let mut unmatched_bc: Vec<u64>;
732    let mut num_orientation_compat_reads = 0usize;
733    let mut max_ambiguity_read = 0usize;
734
735    let nc = num_chunks.expect("unknwon number of chunks").get() as u64;
736    let pbar = ProgressBar::new(nc);
737    pbar.set_style(
738        ProgressStyle::with_template(
739            "[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}",
740        )
741        .unwrap()
742        .progress_chars("##-"),
743    );
744    pbar.set_draw_target(ProgressDrawTarget::stderr_with_hz(5));
745    let cb = |_new_bytes: u64, new_chunks: u64| {
746        pbar.inc(new_chunks);
747    };
748
749    match filter_meth {
750        CellFilterMethod::UnfilteredExternalList(_, _min_reads) => {
751            unmatched_bc = Vec::with_capacity(10000000);
752            // the unfiltered_bc_count map must be valid in this branch
753            if unfiltered_bc_counts.is_some() {
754                let hmu = std::thread::scope(|s| {
755                    let hmu = std::sync::Arc::new(unfiltered_bc_counts.unwrap());
756                    let mut handles = Vec::<
757                        std::thread::ScopedJoinHandle<(usize, usize, Vec<u64>, usize)>,
758                    >::new();
759                    for _ in 0..nworkers {
760                        let rd = rad_reader.is_done();
761                        let q = rad_reader.get_queue();
762                        let hmu = hmu.clone();
763                        let handle = s.spawn(move || {
764                            let mut unmatched_bc = Vec::<u64>::new();
765                            let mut max_ambiguity_read = 0usize;
766                            let mut num_reads = 0;
767                            let mut num_orientation_compat_reads = 0;
768                            while !rd.load(Ordering::SeqCst) || !q.is_empty() {
769                                while let Some(meta_chunk) = q.pop() {
770                                    for c in meta_chunk.iter() {
771                                        num_orientation_compat_reads +=
772                                            update_barcode_hist_unfiltered(
773                                                &hmu,
774                                                &mut unmatched_bc,
775                                                &mut max_ambiguity_read,
776                                                &c,
777                                                &expected_ori,
778                                            );
779                                        num_reads += c.reads.len();
780                                    }
781                                }
782                            }
783                            (
784                                num_reads,
785                                num_orientation_compat_reads,
786                                unmatched_bc,
787                                max_ambiguity_read,
788                            )
789                        });
790                        handles.push(handle);
791                    }
792                    let _ = rad_reader.start_chunk_parsing(Some(cb)); //libradicl::readers::EMPTY_METACHUNK_CALLBACK);
793                    for handle in handles {
794                        let (nr, nocr, ubc, mar) =
795                            handle.join().expect("The parsing thread panicked");
796                        num_reads += nr;
797                        num_orientation_compat_reads += nocr;
798                        unmatched_bc.extend_from_slice(&ubc);
799                        max_ambiguity_read = max_ambiguity_read.max(mar);
800                    }
801                    pbar.finish_with_message("finished parsing RAD file\n");
802                    // return the hash map we no longer need
803                    std::sync::Arc::<DashMap<u64, u64, ahash::RandomState>>::into_inner(hmu)
804                });
805                /*
806                for _ in 0..(hdr.num_chunks as usize) {
807                let c =
808                chunk::Chunk::<AlevinFryReadRecord>::from_bytes(&mut br, &record_context);
809                num_orientation_compat_reads += update_barcode_hist_unfiltered(
810                &mut hmu,
811                &mut unmatched_bc,
812                &mut max_ambiguity_read,
813                &c,
814                &expected_ori,
815                );
816                num_reads += c.reads.len();
817                }
818                */
819                info!(
820                        log,
821                        "observed {} reads ({} orientation consistent) in {} chunks --- max ambiguity read occurs in {} refs",
822                        num_reads.to_formatted_string(&Locale::en),
823                        num_orientation_compat_reads.to_formatted_string(&Locale::en),
824                        num_chunks.expect("nonzero").to_formatted_string(&Locale::en),
825                        max_ambiguity_read.to_formatted_string(&Locale::en)
826                    );
827                let valid_thresh = 0.3f64;
828                match diagnostics::likely_valid_permit_list(
829                    unmatched_bc.len(),
830                    num_reads,
831                    valid_thresh,
832                ) {
833                    Ok(f) => {
834                        info!(log,
835                        "The percentage of mapped reads not matching a known barcode exactly is {:.3}%, which is < the warning threshold {:.3}%",
836                        f * 100f64, valid_thresh * 100f64);
837                    }
838                    Err(e) => {
839                        warn!(log, "{:?}", e);
840                    }
841                }
842
843                process_unfiltered(
844                    hmu.unwrap(),
845                    unmatched_bc,
846                    &rad_reader.file_tag_map,
847                    &filter_meth,
848                    expected_ori,
849                    output_dir,
850                    version,
851                    max_ambiguity_read,
852                    velo_mode,
853                    cmdline,
854                    log,
855                    &gpl_opts,
856                )
857            } else {
858                Ok(0)
859            }
860        }
861        _ => {
862            let hm = std::thread::scope(|s| {
863                let mut handles =
864                    Vec::<std::thread::ScopedJoinHandle<(usize, usize, usize)>>::new();
865                for _ in 0..nworkers {
866                    let rd = rad_reader.is_done();
867                    let q = rad_reader.get_queue();
868                    let hm = hm.clone();
869                    let handle = s.spawn(move || {
870                        let mut max_ambiguity_read = 0usize;
871                        let mut num_reads = 0;
872                        while !rd.load(Ordering::SeqCst) || !q.is_empty() {
873                            while let Some(meta_chunk) = q.pop() {
874                                for c in meta_chunk.iter() {
875                                    update_barcode_hist(
876                                        &hm,
877                                        &mut max_ambiguity_read,
878                                        &c,
879                                        &expected_ori,
880                                    );
881                                    num_reads += c.reads.len();
882                                }
883                            }
884                        }
885                        (num_reads, num_orientation_compat_reads, max_ambiguity_read)
886                    });
887                    handles.push(handle);
888                }
889                let _ = rad_reader.start_chunk_parsing(Some(cb));
890                for handle in handles {
891                    let (nr, nocr, mar) = handle.join().expect("The parsing thread panicked");
892                    num_reads += nr;
893                    num_orientation_compat_reads += nocr;
894                    max_ambiguity_read = max_ambiguity_read.max(mar);
895                }
896                pbar.finish_with_message("finished parsing RAD file\n");
897                // return the hash map we no longer need
898                Arc::<DashMap<u64, u64, ahash::RandomState>>::into_inner(hm)
899                    .expect("unique reference to DashMap")
900            });
901            info!(
902                log,
903                "observed {} reads in {} chunks --- max ambiguity read occurs in {} refs",
904                num_reads.to_formatted_string(&Locale::en),
905                num_chunks.unwrap().to_formatted_string(&Locale::en),
906                max_ambiguity_read.to_formatted_string(&Locale::en)
907            );
908            process_filtered(
909                hm,
910                &rad_reader.file_tag_map,
911                &filter_meth,
912                expected_ori,
913                output_dir,
914                version,
915                max_ambiguity_read,
916                velo_mode,
917                cmdline,
918                log,
919                &gpl_opts,
920            )
921        }
922    }
923
924    /*
925    let valid_bc: Vec<u64>;
926    let mut freq: Vec<u64> = hm.values().cloned().collect();
927    freq.sort_unstable();
928    freq.reverse();
929
930    // select from among supported filter methods
931    match filter_meth {
932    CellFilterMethod::KneeFinding => {
933        let num_bc = get_knee(&freq[..], 100, &log);
934        let min_freq = freq[num_bc];
935
936        // collect all of the barcodes that have a frequency
937        // >= to min_thresh.
938        valid_bc = libradicl::permit_list_from_threshold(&hm, min_freq);
939        info!(
940        log,
941        "knee distance method resulted in the selection of {} permitted barcodes.",
942        valid_bc.len()
943        );
944    }
945    CellFilterMethod::ForceCells(top_k) => {
946        let num_bc = if freq.len() < top_k {
947        freq.len() - 1
948        } else {
949        top_k - 1
950        };
951
952        let min_freq = freq[num_bc];
953
954        // collect all of the barcodes that have a frequency
955        // >= to min_thresh.
956        valid_bc = libradicl::permit_list_from_threshold(&hm, min_freq);
957    }
958    CellFilterMethod::ExplicitList(valid_bc_file) => {
959        valid_bc = libradicl::permit_list_from_file(valid_bc_file, ft_vals.bclen);
960    }
961    CellFilterMethod::ExpectCells(expected_num_cells) => {
962        let robust_quantile = 0.99f64;
963        let robust_div = 10.0f64;
964        let robust_ind = (expected_num_cells as f64 * robust_quantile).round() as u64;
965        // the robust ind must be valid
966        let ind = cmp::min(freq.len() - 1, robust_ind as usize);
967        let robust_freq = freq[ind];
968        let min_freq = std::cmp::max(1u64, (robust_freq as f64 / robust_div).round() as u64);
969        valid_bc = libradicl::permit_list_from_threshold(&hm, min_freq);
970    }
971    CellFilterMethod::UnfilteredExternalList(_, min_reads) => {
972        unimplemented!();
973    }
974    }
975
976    // generate the map from each permitted barcode to all barcodes within
977    // edit distance 1 of it.
978    let full_permit_list =
979    libradicl::utils::generate_permitlist_map(&valid_bc, ft_vals.bclen as usize).unwrap();
980
981    let s2 = RandomState::<Hash64>::new();
982    let mut permitted_map = HashMap::with_capacity_and_hasher(valid_bc.len(), s2);
983
984    let mut num_corrected = 0;
985    for (k, v) in hm.iter() {
986    if let Some(&valid_key) = full_permit_list.get(k) {
987        *permitted_map.entry(valid_key).or_insert(0u64) += *v;
988        num_corrected += 1;
989        //println!("{} was a neighbor of {}, with count {}", k, valid_key, v);
990    }
991    }
992
993    let parent = std::path::Path::new(&output_dir);
994    std::fs::create_dir_all(&parent).unwrap();
995    let o_path = parent.join("permit_freq.tsv");
996    let output = std::fs::File::create(&o_path).expect("could not create output.");
997    let mut writer = BufWriter::new(&output);
998
999    for (k, v) in permitted_map {
1000    //let bc_mer: BitKmer = (k, ft_vals.bclen as u8);
1001    writeln!(
1002        &mut writer,
1003        "{}\t{}",
1004        k,
1005        //from_utf8(&bitmer_to_bytes(bc_mer)[..]).unwrap(),
1006        v
1007    )
1008    .expect("couldn't write to output file.");
1009    }
1010
1011    let o_path = parent.join("all_freq.tsv");
1012    let output = std::fs::File::create(&o_path).expect("could not create output.");
1013    let mut writer = BufWriter::new(&output);
1014    for (k, v) in hm {
1015    let bc_mer: BitKmer = (k, ft_vals.bclen as u8);
1016    writeln!(
1017        &mut writer,
1018        "{}\t{}",
1019        from_utf8(&bitmer_to_bytes(bc_mer)[..]).unwrap(),
1020        v
1021    )
1022    .expect("couldn't write to output file.");
1023    }
1024
1025    let s_path = parent.join("permit_map.bin");
1026    let s_file = std::fs::File::create(&s_path).expect("could not create serialization file.");
1027    let mut s_writer = BufWriter::new(&s_file);
1028    bincode::serialize_into(&mut s_writer, &full_permit_list)
1029    .expect("couldn't serialize permit list.");
1030
1031    let meta_info = json!({
1032    "expected_ori" : expected_ori.strand_symbol()
1033    });
1034
1035    let m_path = parent.join("generate_permit_list.json");
1036    let mut m_file = std::fs::File::create(&m_path).expect("could not create metadata file.");
1037
1038    let meta_info_string =
1039    serde_json::to_string_pretty(&meta_info).expect("could not format json.");
1040    m_file
1041    .write_all(meta_info_string.as_bytes())
1042    .expect("cannot write to generate_permit_list.json file");
1043
1044    info!(
1045    log,
1046    "total number of corrected barcodes : {}",
1047    num_corrected.to_formatted_string(&Locale::en)
1048    );
1049    Ok(num_corrected)
1050    */
1051}
1052
1053pub fn update_barcode_hist_unfiltered(
1054    hist: &DashMap<u64, u64, ahash::RandomState>,
1055    unmatched_bc: &mut Vec<u64>,
1056    max_ambiguity_read: &mut usize,
1057    chunk: &chunk::Chunk<AlevinFryReadRecord>,
1058    expected_ori: &Strand,
1059) -> usize {
1060    let mut num_strand_compat_reads = 0usize;
1061    match expected_ori {
1062        Strand::Unknown => {
1063            for r in &chunk.reads {
1064                num_strand_compat_reads += 1;
1065                *max_ambiguity_read = r.refs.len().max(*max_ambiguity_read);
1066                // lookup the barcode in the map of unfiltered known
1067                // barcodes
1068                match hist.get_mut(&r.bc) {
1069                    // if we find a match, increment the count
1070                    Some(mut c) => *c += 1,
1071                    // otherwise, push this into the unmatched list
1072                    None => {
1073                        unmatched_bc.push(r.bc);
1074                    }
1075                }
1076            }
1077        }
1078        Strand::Forward => {
1079            for r in &chunk.reads {
1080                if r.dirs.iter().any(|&x| x) {
1081                    num_strand_compat_reads += 1;
1082                    *max_ambiguity_read = r.refs.len().max(*max_ambiguity_read);
1083                    // lookup the barcode in the map of unfiltered known
1084                    // barcodes
1085                    match hist.get_mut(&r.bc) {
1086                        // if we find a match, increment the count
1087                        Some(mut c) => *c += 1,
1088                        // otherwise, push this into the unmatched list
1089                        None => {
1090                            unmatched_bc.push(r.bc);
1091                        }
1092                    }
1093                }
1094            }
1095        }
1096        Strand::Reverse => {
1097            for r in &chunk.reads {
1098                if r.dirs.iter().any(|&x| !x) {
1099                    num_strand_compat_reads += 1;
1100                    *max_ambiguity_read = r.refs.len().max(*max_ambiguity_read);
1101                    // lookup the barcode in the map of unfiltered known
1102                    // barcodes
1103                    match hist.get_mut(&r.bc) {
1104                        // if we find a match, increment the count
1105                        Some(mut c) => *c += 1,
1106                        // otherwise, push this into the unmatched list
1107                        None => {
1108                            unmatched_bc.push(r.bc);
1109                        }
1110                    }
1111                }
1112            }
1113        }
1114    }
1115    num_strand_compat_reads
1116}
1117
1118pub fn update_barcode_hist(
1119    hist: &DashMap<u64, u64, ahash::RandomState>,
1120    max_ambiguity_read: &mut usize,
1121    chunk: &chunk::Chunk<AlevinFryReadRecord>,
1122    expected_ori: &Strand,
1123) {
1124    match expected_ori {
1125        Strand::Unknown => {
1126            for r in &chunk.reads {
1127                *max_ambiguity_read = r.refs.len().max(*max_ambiguity_read);
1128                *hist.entry(r.bc).or_insert(0) += 1;
1129            }
1130        }
1131        Strand::Forward => {
1132            for r in &chunk.reads {
1133                if r.dirs.iter().any(|&x| x) {
1134                    *max_ambiguity_read = r.refs.len().max(*max_ambiguity_read);
1135                    *hist.entry(r.bc).or_insert(0) += 1;
1136                }
1137            }
1138        }
1139        Strand::Reverse => {
1140            for r in &chunk.reads {
1141                if r.dirs.iter().any(|&x| !x) {
1142                    *max_ambiguity_read = r.refs.len().max(*max_ambiguity_read);
1143                    *hist.entry(r.bc).or_insert(0) += 1;
1144                }
1145            }
1146        }
1147    }
1148}
1149
1150pub fn permit_list_from_threshold(
1151    hist: &HashMap<u64, u64, ahash::RandomState>,
1152    min_freq: u64,
1153) -> Vec<u64> {
1154    let valid_bc: Vec<u64> = hist
1155        .iter()
1156        .filter_map(|(k, v)| if v >= &min_freq { Some(*k) } else { None })
1157        .collect();
1158    valid_bc
1159}
1160
1161pub fn permit_list_from_file<P>(ifile: P, bclen: u16) -> Vec<u64>
1162where
1163    P: AsRef<Path>,
1164{
1165    let f = File::open(ifile).expect("couldn't open input barcode file.");
1166    let br = BufReader::new(f);
1167    let mut bc = Vec::<u64>::with_capacity(10_000);
1168
1169    for l in br.lines() {
1170        let line = l.expect("couldn't read line from barcode file.");
1171        let mut bnk = BitNuclKmer::new(line.as_bytes(), bclen as u8, false);
1172        let (_, k, _) = bnk.next().expect("can't extract kmer");
1173        bc.push(k.0);
1174    }
1175    bc
1176}