1use anyhow::Context;
11use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
12
13#[allow(unused_imports)]
14use slog::{crit, info, warn};
15
16use needletail::bitkmer::*;
17use num_format::{Locale, ToFormattedString};
18use serde::Serialize;
19use serde_json::json;
20use std::collections::{HashMap, HashSet};
21use std::fs;
22use std::fs::File;
23use std::io::Write;
24use std::io::{BufRead, BufReader, BufWriter};
25use std::str::FromStr;
26use std::string::ToString;
27use std::sync::atomic::{AtomicUsize, Ordering};
28use std::sync::{Arc, Mutex};
29use std::thread;
30
31use std::fmt;
32use flate2::write::GzEncoder;
35use flate2::Compression;
36
37use crate::em::{em_optimize, em_optimize_subset, run_bootstrap, EmInitType};
38use crate::eq_class::{EqMap, EqMapType, IndexedEqList};
39use crate::prog_opts::QuantOpts;
40use crate::pugutils;
41use crate::utils as afutils;
42
43use libradicl::{
44 chunk,
45 header::RadPrelude,
46 record::{AlevinFryReadRecord, AlevinFryRecordContext},
47};
48
49type BufferedGzFile = BufWriter<GzEncoder<fs::File>>;
50
51#[derive(PartialEq, Eq, Debug, Default, Clone, Copy, Serialize)]
52pub enum SplicedAmbiguityModel {
53 PreferAmbiguity,
54 #[default]
55 WinnerTakeAll,
56}
57
58impl FromStr for SplicedAmbiguityModel {
60 type Err = &'static str;
61 fn from_str(s: &str) -> Result<Self, Self::Err> {
62 match s {
63 "prefer-ambig" => Ok(SplicedAmbiguityModel::PreferAmbiguity),
64 "winner-take-all" => Ok(SplicedAmbiguityModel::WinnerTakeAll),
65 _ => Err("no match"),
66 }
67 }
68}
69
70#[derive(PartialEq, Eq, Debug, Default, Clone, Copy, Serialize)]
71pub enum ResolutionStrategy {
72 Trivial,
73 #[default]
74 CellRangerLike,
75 CellRangerLikeEm,
76 ParsimonyEm,
77 Parsimony,
78 ParsimonyGeneEm,
79 ParsimonyGene,
80}
81
82impl fmt::Display for ResolutionStrategy {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 write!(f, "{:?}", self)
85 }
86}
87
88impl FromStr for ResolutionStrategy {
90 type Err = &'static str;
91 fn from_str(s: &str) -> Result<Self, Self::Err> {
92 match s {
93 "trivial" => Ok(ResolutionStrategy::Trivial),
94 "cr-like" => Ok(ResolutionStrategy::CellRangerLike),
95 "cr-like-em" => Ok(ResolutionStrategy::CellRangerLikeEm),
96 "parsimony-em" => Ok(ResolutionStrategy::ParsimonyEm),
97 "parsimony" => Ok(ResolutionStrategy::Parsimony),
98 "parsimony-gene" => Ok(ResolutionStrategy::ParsimonyGene),
99 "parsimony-gene-em" => Ok(ResolutionStrategy::ParsimonyGeneEm),
100 _ => Err("no match"),
101 }
102 }
103}
104
105struct BootstrapHelper {
106 bsfile: Option<BufferedGzFile>,
107 mean_var_files: Option<(BufferedGzFile, BufferedGzFile)>,
108}
109
110impl BootstrapHelper {
111 fn new(
112 output_path: &std::path::Path,
113 num_bootstraps: u32,
114 summary_stat: bool,
115 ) -> BootstrapHelper {
116 if num_bootstraps > 0 {
117 if summary_stat {
118 let bootstrap_mean_path = output_path.join("bootstraps_mean.eds.gz");
119 let bootstrap_var_path = output_path.join("bootstraps_var.eds.gz");
120 let bt_mean_buffered = GzEncoder::new(
121 fs::File::create(bootstrap_mean_path).unwrap(),
122 Compression::default(),
123 );
124 let bt_var_buffered = GzEncoder::new(
125 fs::File::create(bootstrap_var_path).unwrap(),
126 Compression::default(),
127 );
128 BootstrapHelper {
129 bsfile: None,
130 mean_var_files: Some((
131 BufWriter::new(bt_mean_buffered),
132 BufWriter::new(bt_var_buffered),
133 )),
134 }
135 } else {
136 let bootstrap_path = output_path.join("bootstraps.eds.gz");
137 let bt_buffered = GzEncoder::new(
138 fs::File::create(bootstrap_path).unwrap(),
139 Compression::default(),
140 );
141 BootstrapHelper {
142 bsfile: Some(BufWriter::new(bt_buffered)),
143 mean_var_files: None,
144 }
145 }
146 } else {
147 BootstrapHelper {
148 bsfile: None,
149 mean_var_files: None,
150 }
151 }
152 }
153}
154
155struct QuantOutputInfo {
156 barcode_file: BufWriter<fs::File>,
157 eds_file: BufWriter<GzEncoder<fs::File>>,
158 feature_file: BufWriter<fs::File>,
159 trimat: sprs::TriMatI<f32, u32>,
160 row_index: usize,
161 bootstrap_helper: BootstrapHelper, }
163
164struct EqcMap {
165 global_eqc: HashMap<Vec<u32>, u64, ahash::RandomState>,
167 cell_level_count: Vec<(u64, u32)>,
170 cell_offset: Vec<(usize, usize)>,
173}
174
175fn write_eqc_counts(
176 eqid_map_lock: &Arc<Mutex<EqcMap>>,
177 num_genes: usize,
178 usa_mode: bool,
179 output_path: &std::path::Path,
180 log: &slog::Logger,
181) -> anyhow::Result<bool> {
182 let eqmap_deref = eqid_map_lock.lock();
183 let geqmap = eqmap_deref.unwrap();
184 let num_eqclasses = geqmap.global_eqc.len();
185
186 info!(
187 log,
188 "Writing gene-level equivalence class with {:?} classes",
189 geqmap.global_eqc.len()
190 );
191
192 let mut eqmat = sprs::TriMatI::<f32, u32>::with_capacity(
194 (geqmap.cell_offset.len(), num_eqclasses), geqmap.cell_level_count.len(), );
197
198 let mut global_offset = 0usize;
200 for (row_index, num_cell_eqs) in geqmap.cell_offset.iter() {
201 let slice = global_offset..(global_offset + num_cell_eqs);
202 for (eqid, umi_count) in geqmap.cell_level_count[slice].iter() {
203 eqmat.add_triplet(*row_index, *eqid as usize, *umi_count as f32);
204 }
205 global_offset += num_cell_eqs;
206 }
207
208 let mtx_path = output_path.join("geqc_counts.mtx");
210 sprs::io::write_matrix_market(mtx_path, &eqmat).context("could not write geqc_counts.mtx")?;
211
212 let gn_eq_path = output_path.join("gene_eqclass.txt.gz");
214 let mut gn_eq_writer = BufWriter::new(GzEncoder::new(
215 fs::File::create(gn_eq_path).unwrap(),
216 Compression::default(),
217 ));
218
219 gn_eq_writer
221 .write_all(format!("{}\n", num_genes).as_bytes())
222 .context("could not write to gene_eqclass.txt.gz")?;
223
224 gn_eq_writer
226 .write_all(format!("{}\n", num_eqclasses).as_bytes())
227 .context("could not write to gene_eqclass.txt.gz")?;
228
229 if usa_mode {
234 let unspliced_offset = (num_genes / 3) as u32;
241 let ambig_offset = 2 * unspliced_offset;
243 let mut gl;
245
246 for (gene_list, eqid) in geqmap.global_eqc.iter() {
249 let mut peekable_arr = gene_list.iter().peekable();
252 while let Some(cg) = peekable_arr.next() {
254 if let Some(ng) = peekable_arr.peek() {
256 if afutils::same_gene(*cg, **ng, true) {
260 gl = (cg >> 1) + ambig_offset;
261 gn_eq_writer
262 .write_all(format!("{}\t", gl).as_bytes())
263 .expect("could not write to gene_eqclass.txt.gz");
264 peekable_arr.next();
267 continue;
268 }
269 }
270 if afutils::is_spliced(*cg) {
274 gl = cg >> 1;
275 } else {
276 gl = (cg >> 1) + unspliced_offset;
277 }
278 gn_eq_writer
279 .write_all(format!("{}\t", gl).as_bytes())
280 .context("could not write to gene_eqclass.txt.gz")?;
281 }
282 gn_eq_writer
283 .write_all(format!("{}\n", eqid).as_bytes())
284 .context("could not write to gene_eqclass.txt.gz")?;
285 }
286 } else {
287 for (gene_list, eqid) in geqmap.global_eqc.iter() {
290 for g in gene_list.iter() {
291 gn_eq_writer
292 .write_all(format!("{}\t", g).as_bytes())
293 .context("could not write to gene_eqclass.txt.gz")?;
294 }
295 gn_eq_writer
296 .write_all(format!("{}\n", eqid).as_bytes())
297 .context("could not write to gene_eqclass.txt.gz")?;
298 }
299 }
300 Ok(true)
301}
302
303pub fn quantify(quant_opts: QuantOpts) -> anyhow::Result<()> {
306 let parent = std::path::Path::new(quant_opts.input_dir);
307 let log = quant_opts.log;
308
309 let collate_md_file =
311 File::open(parent.join("collate.json")).context("could not open the collate.json file.")?;
312 let collate_md: serde_json::Value = serde_json::from_reader(&collate_md_file)?;
313
314 let compressed_input = collate_md["compressed_output"]
316 .as_bool()
317 .context("could not read compressed_output field from collate metadata.")?;
318
319 if compressed_input {
320 let i_file =
321 File::open(parent.join("map.collated.rad.sz")).context("run collate before quant")?;
322 let br = BufReader::new(snap::read::FrameDecoder::new(&i_file));
323
324 info!(
325 log,
326 "quantifying from compressed, collated RAD file {:?}", i_file
327 );
328
329 do_quantify(br, quant_opts)
330 } else {
331 let i_file =
332 File::open(parent.join("map.collated.rad")).context("run collate before quant")?;
333 let br = BufReader::new(&i_file);
334
335 info!(
336 log,
337 "quantifying from uncompressed, collated RAD file {:?}", i_file
338 );
339
340 do_quantify(br, quant_opts)
341 }
342}
343
344pub fn do_quantify<T: BufRead>(mut br: T, quant_opts: QuantOpts) -> anyhow::Result<()> {
347 let parent = std::path::Path::new(quant_opts.input_dir);
348
349 let prelude = RadPrelude::from_bytes(&mut br)?;
350 let hdr = &prelude.hdr;
351 let init_uniform = quant_opts.init_uniform;
354 let summary_stat = quant_opts.summary_stat;
355 let dump_eq = quant_opts.dump_eq;
356 let use_mtx = quant_opts.use_mtx;
357 let resolution = quant_opts.resolution;
358 let pug_exact_umi = quant_opts.pug_exact_umi;
359 let mut sa_model = quant_opts.sa_model;
360 let small_thresh = quant_opts.small_thresh;
361 let large_graph_thresh = quant_opts.large_graph_thresh;
362 let filter_list = quant_opts.filter_list;
363 let log = quant_opts.log;
364 let num_threads = quant_opts.num_threads;
365 let num_bootstraps = quant_opts.num_bootstraps;
366
367 let mut num_cells = hdr.num_chunks;
372
373 info!(
374 log,
375 "paired : {:?}, ref_count : {}, num_chunks : {}",
376 hdr.is_paired != 0,
377 hdr.ref_count.to_formatted_string(&Locale::en),
378 hdr.num_chunks.to_formatted_string(&Locale::en)
379 );
380
381 let rnhasher = ahash::RandomState::with_seeds(2u64, 7u64, 1u64, 8u64);
386 let mut rname_to_id: HashMap<String, u32, ahash::RandomState> =
387 HashMap::with_capacity_and_hasher(hdr.ref_count as usize, rnhasher);
388 for (i, n) in hdr.ref_names.iter().enumerate() {
389 rname_to_id.insert(n.clone(), i as u32);
390 }
391 let mut gene_names: Vec<String> = Vec::with_capacity((hdr.ref_count / 2) as usize);
395 let gnhasher = ahash::RandomState::with_seeds(2u64, 7u64, 1u64, 8u64);
396 let mut gene_name_to_id: HashMap<String, u32, ahash::RandomState> =
397 HashMap::with_hasher(gnhasher);
398
399 let usa_mode;
400 let tid_to_gid;
401 match afutils::parse_tg_map(
407 quant_opts.tg_map,
408 hdr.ref_count as usize,
409 &rname_to_id,
410 &mut gene_names,
411 &mut gene_name_to_id,
412 ) {
413 Ok((v, us)) => {
414 tid_to_gid = v;
415 usa_mode = us;
416 if usa_mode {
417 assert_eq!(
418 num_bootstraps, 0,
419 "currently USA-mode (all-in-one unspliced/spliced/ambiguous) analysis cannot be used with bootstrapping."
420 );
421
422 match resolution {
423 ResolutionStrategy::Parsimony
424 | ResolutionStrategy::ParsimonyEm
425 | ResolutionStrategy::ParsimonyGene
426 | ResolutionStrategy::ParsimonyGeneEm => {
427 info!(log,
428 "currently USA-mode (all-in-one unspliced/spliced/ambiguous) analysis using parsimony(-gene) or parsimony(-gene)-em resolution is EXPERIMENTAL."
429 );
430 }
431 _ => {}
432 }
433 } else {
434 match sa_model {
438 SplicedAmbiguityModel::WinnerTakeAll => {}
439 _ => {
440 info!(
441 log,
442 "When not operating in USA-mode (all-in-one unspliced/spliced/ambiguous), the SplicedAmbiguityModel will be ignored."
443 );
444 sa_model = SplicedAmbiguityModel::WinnerTakeAll;
445 }
446 }
447 }
448 }
449 Err(e) => {
450 return Err(e);
451 }
452 }
453
454 info!(
455 log,
456 "tg-map contained {} genes mapping to {} transcripts.",
457 gene_names.len().to_formatted_string(&Locale::en),
458 tid_to_gid.len().to_formatted_string(&Locale::en)
459 );
460
461 let bc_unmapped_file =
463 std::fs::File::open(parent.join("unmapped_bc_count_collated.bin")).unwrap();
464 let bc_unmapped_map: Arc<HashMap<u64, u32>> =
465 Arc::new(bincode::deserialize_from(&bc_unmapped_file).unwrap());
466
467 let fl_tags = &prelude.file_tags;
469 info!(log, "read {:?} file-level tags", fl_tags.tags.len());
470 let rl_tags = &prelude.read_tags;
472 info!(log, "read {:?} read-level tags", rl_tags.tags.len());
473 let al_tags = &prelude.aln_tags;
475 info!(log, "read {:?} alignemnt-level tags", al_tags.tags.len());
476
477 let file_tag_map = prelude.file_tags.parse_tags_from_bytes(&mut br)?;
478 info!(log, "File-level tag values {:?}", file_tag_map);
479
480 let barcode_tag = file_tag_map
481 .get("cblen")
482 .expect("tag map must contain cblen");
483 let barcode_len: u16 = barcode_tag.try_into()?;
484
485 let mut retained_bc: Option<HashSet<u64, ahash::RandomState>> = None;
487 if let Some(fname) = filter_list {
488 match afutils::read_filter_list(fname, barcode_len) {
489 Ok(fset) => {
490 num_cells = fset.len() as u64;
493 retained_bc = Some(fset);
494 }
495 Err(e) => {
496 return Err(e);
497 }
498 }
499 }
500
501 let mut _num_reads: usize = 0;
502
503 let pbar = ProgressBar::with_draw_target(
504 Some(num_cells),
505 ProgressDrawTarget::stderr_with_hz(5u8), );
507 pbar.set_style(
508 ProgressStyle::default_bar()
509 .template(
510 "{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos:>7}/{len:7} {msg}",
511 )
512 .expect("ProgressStyle template was invalid.")
513 .progress_chars("╢▌▌░╟"),
514 );
515
516 let n_workers = if num_threads > 1 {
531 (num_threads - 1) as usize
532 } else {
533 1
534 };
535 let mut chunk_reader = libradicl::readers::ParallelChunkReader::<AlevinFryReadRecord>::new(
536 &prelude,
537 std::num::NonZeroUsize::new(n_workers).unwrap(),
538 );
539
540 let cells_to_process = Arc::new(AtomicUsize::new(num_cells as usize));
542 let tid_to_gid_shared = std::sync::Arc::new(tid_to_gid);
544 let ref_count = hdr.ref_count as u32;
546
547 let num_genes = gene_name_to_id.len();
549
550 let output_path = std::path::Path::new(quant_opts.output_dir);
552 fs::create_dir_all(output_path)?;
553
554 let output_matrix_path = output_path.join("alevin");
556 fs::create_dir_all(&output_matrix_path)?;
557
558 let bc_path = output_matrix_path.join("quants_mat_rows.txt");
560 let bc_file = fs::File::create(bc_path)?;
561
562 let mat_path = output_matrix_path.join("quants_mat.gz");
563 let boot_helper = BootstrapHelper::new(output_path, num_bootstraps, summary_stat);
564 let buffered = GzEncoder::new(fs::File::create(&mat_path)?, Compression::default());
565
566 let ff_path = output_path.join("featureDump.txt");
567 let mut ff_file = fs::File::create(ff_path)?;
568 writeln!(
569 ff_file,
570 "CB\tCorrectedReads\tMappedReads\tDeduplicatedReads\tMappingRate\tDedupRate\tMeanByMax\tNumGenesExpressed\tNumGenesOverMean"
571 )?;
572 let alt_res_cells = Arc::new(Mutex::new(Vec::<u64>::new()));
573 let empty_resolved_cells = Arc::new(Mutex::new(Vec::<u64>::new()));
574
575 let tmcap = if use_mtx {
576 (0.1f64 * num_genes as f64 * num_cells as f64).round() as usize
577 } else {
578 0usize
579 };
580
581 let num_rows = if usa_mode {
583 let mid = (gene_name_to_id
590 .values()
591 .max()
592 .expect("gene name to id map should not be empty.")
593 + 2) as usize;
594
595 mid + (mid / 2)
598 } else {
599 num_genes
600 };
601
602 let usa_offsets = if usa_mode {
603 Some((num_rows / 3, (2 * num_rows / 3)))
604 } else {
605 None
606 };
607
608 let trimat = sprs::TriMatI::<f32, u32>::with_capacity((num_cells as usize, num_rows), tmcap);
609
610 let bc_writer = Arc::new(Mutex::new(QuantOutputInfo {
611 barcode_file: BufWriter::new(bc_file),
612 eds_file: BufWriter::new(buffered),
613 feature_file: BufWriter::new(ff_file),
614 trimat,
615 row_index: 0usize,
616 bootstrap_helper: boot_helper,
617 }));
618
619 let mmrate = Arc::new(Mutex::new(vec![0f64; num_cells as usize]));
620
621 let mut thread_handles: Vec<thread::JoinHandle<usize>> = Vec::with_capacity(n_workers);
622
623 let so = ahash::RandomState::with_seeds(2u64, 7u64, 1u64, 8u64);
630 let eqid_map_lock = Arc::new(Mutex::new(EqcMap {
631 global_eqc: HashMap::with_hasher(so),
632 cell_level_count: Vec::new(),
633 cell_offset: Vec::new(),
634 }));
635
636 for _worker in 0..n_workers {
638 let in_q = chunk_reader.get_queue();
641 let is_done = chunk_reader.is_done();
642 let log = log.clone();
644 let tid_to_gid = tid_to_gid_shared.clone();
646 let cells_remaining = cells_to_process.clone();
648
649 let bcout = bc_writer.clone();
651 let eqid_map_lockc = eqid_map_lock.clone();
653 let bclen = barcode_len;
655 let alt_res_cells = alt_res_cells.clone();
656 let empty_resolved_cells = empty_resolved_cells.clone();
657 let unmapped_count = bc_unmapped_map.clone();
658 let mmrate = mmrate.clone();
659
660 let eq_map_type = match resolution {
664 ResolutionStrategy::ParsimonyGene | ResolutionStrategy::ParsimonyGeneEm => {
665 EqMapType::GeneLevel
666 }
667 _ => EqMapType::TranscriptLevel,
668 };
669
670 let num_eq_targets = match eq_map_type {
671 EqMapType::TranscriptLevel => ref_count,
672 EqMapType::GeneLevel => {
673 gene_name_to_id
676 .values()
677 .max()
678 .expect("gene name to id map should not be empty.")
679 + 2
680 }
681 };
682
683 let handle = std::thread::spawn(move || {
685 let mut unique_evidence = vec![false; num_rows];
688 let mut no_ambiguity = vec![false; num_rows];
689 let mut eq_map = EqMap::new(num_eq_targets, eq_map_type);
690 let mut expressed_vec = Vec::<f32>::with_capacity(num_genes);
691 let mut expressed_ind = Vec::<usize>::with_capacity(num_genes);
692 let mut eds_bytes = Vec::<u8>::new();
693 let mut bt_eds_bytes: Vec<u8> = Vec::new();
694 let mut eds_mean_bytes: Vec<u8> = Vec::new();
695 let mut eds_var_bytes: Vec<u8> = Vec::new();
696
697 let s = ahash::RandomState::with_seeds(2u64, 7u64, 1u64, 8u64);
709 let mut gene_eqc: HashMap<Vec<u32>, u32, ahash::RandomState> = HashMap::with_hasher(s);
710
711 let em_init_type = if init_uniform {
712 EmInitType::Uniform
713 } else {
714 EmInitType::Informative
715 };
716
717 let mut idx_eq_list = IndexedEqList::new();
721 let mut eq_id_count = Vec::<(u32, u32)>::new();
722
723 let mut local_nrec = 0usize;
724 while !is_done.load(Ordering::SeqCst) || !in_q.is_empty() {
727 while let Some(meta_chunk) = in_q.pop() {
728 let first_cell_in_chunk = meta_chunk.first_chunk_index;
729 for (cn, mut c) in meta_chunk.iter().enumerate() {
730 cells_remaining.fetch_sub(1, Ordering::SeqCst);
731 let cell_num = first_cell_in_chunk + cn;
732
733 let nbytes = c.nbytes;
734 let nrec = c.nrec;
735 local_nrec += nrec as usize;
736
737 if c.reads.is_empty() {
738 warn!(log, "Discovered empty chunk; should not happen! cell_num = {}, nbytes = {}, nrec = {}", cell_num, nbytes, nrec);
739 }
740
741 let bc = c.reads.first().expect("chunk with no reads").bc;
745
746 let mut counts: Vec<f32>;
749 let mut alt_resolution = false;
750
751 let mut bootstraps: Vec<Vec<f32>> = Vec::new();
752
753 let non_trivial = c.reads.len() >= small_thresh;
754 if non_trivial {
755 let small_cell = c.reads.len() <= 250;
758
759 match resolution {
763 ResolutionStrategy::CellRangerLike
764 | ResolutionStrategy::CellRangerLikeEm => {
765 if small_cell {
766 pugutils::get_num_molecules_cell_ranger_like_small(
767 &mut c,
768 &tid_to_gid,
769 num_genes,
770 &mut gene_eqc,
771 sa_model,
772 &log,
773 );
774 } else {
775 eq_map.init_from_chunk(&mut c);
776 pugutils::get_num_molecules_cell_ranger_like(
777 &eq_map,
778 &tid_to_gid,
779 num_genes,
780 &mut gene_eqc,
781 sa_model,
782 &log,
783 );
784 eq_map.clear();
785 }
786 let only_unique =
787 resolution == ResolutionStrategy::CellRangerLike;
788
789 match (usa_mode, only_unique) {
792 (true, true) => {
793 counts = afutils::extract_counts(&gene_eqc, num_rows);
795 }
796 (true, false) => {
797 afutils::extract_usa_eqmap(
799 &gene_eqc,
800 num_rows,
801 &mut idx_eq_list,
802 &mut eq_id_count,
803 );
804 counts = em_optimize_subset(
805 &idx_eq_list,
806 &eq_id_count,
807 &mut unique_evidence,
808 &mut no_ambiguity,
809 em_init_type,
810 num_rows,
811 only_unique,
812 usa_offsets,
813 &log,
814 );
815 }
816 (false, _) => {
817 counts = em_optimize(
819 &gene_eqc,
820 &mut unique_evidence,
821 &mut no_ambiguity,
822 em_init_type,
823 num_genes,
824 only_unique,
825 &log,
826 );
827 }
828 }
829 }
830 ResolutionStrategy::Trivial => {
831 eq_map.init_from_chunk(&mut c);
832 let ct = pugutils::get_num_molecules_trivial_discard_all_ambig(
833 &eq_map,
834 &tid_to_gid,
835 num_genes,
836 &log,
837 );
838 counts = ct.0;
839 mmrate.lock().unwrap()[cell_num] = ct.1;
840 eq_map.clear();
841 }
842 ResolutionStrategy::Parsimony
843 | ResolutionStrategy::ParsimonyEm
844 | ResolutionStrategy::ParsimonyGene
845 | ResolutionStrategy::ParsimonyGeneEm => {
846 if (resolution == ResolutionStrategy::ParsimonyGene)
847 || (resolution == ResolutionStrategy::ParsimonyGeneEm)
848 {
849 eq_map.init_from_chunk_gene_level(&mut c, &tid_to_gid);
850 } else {
851 eq_map.init_from_chunk(&mut c);
852 }
853
854 let g = pugutils::extract_graph(&eq_map, pug_exact_umi, &log);
855 let s = ahash::RandomState::with_seeds(bc, 7u64, 1u64, 8u64);
858 let pug_stats = pugutils::get_num_molecules(
859 &g,
860 &eq_map,
861 &tid_to_gid,
862 &mut gene_eqc,
863 &s,
864 large_graph_thresh,
865 &log,
866 );
867 alt_resolution = pug_stats.used_alternative_strategy; eq_map.clear();
869
870 let only_unique = (resolution == ResolutionStrategy::Parsimony)
871 || (resolution == ResolutionStrategy::ParsimonyGene);
872
873 match (usa_mode, only_unique) {
876 (true, true) => {
877 counts = afutils::extract_counts(&gene_eqc, num_rows);
879 }
880 (true, false) => {
881 afutils::extract_usa_eqmap(
883 &gene_eqc,
884 num_rows,
885 &mut idx_eq_list,
886 &mut eq_id_count,
887 );
888 counts = em_optimize_subset(
889 &idx_eq_list,
890 &eq_id_count,
891 &mut unique_evidence,
892 &mut no_ambiguity,
893 em_init_type,
894 num_rows,
895 only_unique,
896 usa_offsets,
897 &log,
898 );
899 }
900 (false, _) => {
901 counts = em_optimize(
903 &gene_eqc,
904 &mut unique_evidence,
905 &mut no_ambiguity,
906 em_init_type,
907 num_genes,
908 only_unique,
909 &log,
910 );
911 }
912 }
913 }
914 }
915
916 if num_bootstraps > 0 {
917 bootstraps = run_bootstrap(
918 &gene_eqc,
919 num_bootstraps,
920 &counts,
921 init_uniform,
922 summary_stat,
923 &log,
924 );
925 }
926
927 unique_evidence.fill(false);
932 no_ambiguity.fill(false);
933
934 } else {
936 pugutils::get_num_molecules_cell_ranger_like_small(
939 &mut c,
940 &tid_to_gid,
941 num_genes,
942 &mut gene_eqc,
943 sa_model,
944 &log,
945 );
946 if usa_mode {
948 match resolution {
952 ResolutionStrategy::CellRangerLike
953 | ResolutionStrategy::Parsimony
954 | ResolutionStrategy::ParsimonyGene => {
955 counts = afutils::extract_counts(&gene_eqc, num_rows);
956 }
957 ResolutionStrategy::CellRangerLikeEm
958 | ResolutionStrategy::ParsimonyEm
959 | ResolutionStrategy::ParsimonyGeneEm => {
960 counts =
961 afutils::extract_counts_mm_uniform(&gene_eqc, num_rows);
962 }
963 _ => {
964 counts = vec![0f32; num_genes];
965 warn!(log, "Should not reach here, only cr-like, cr-like-em, parsimony(-gene) and parsimony(-gene)-em are supported in USA-mode.");
966 }
967 }
968 } else {
969 counts = vec![0f32; num_genes];
971 for (k, v) in gene_eqc.iter() {
972 if k.len() == 1 {
973 counts[*k.first().unwrap() as usize] += *v as f32;
974 } else {
975 match resolution {
976 ResolutionStrategy::CellRangerLikeEm
977 | ResolutionStrategy::ParsimonyEm => {
978 let contrib = 1.0 / (k.len() as f32);
979 for g in k.iter() {
980 counts[*g as usize] += contrib;
981 }
982 }
983 _ => {
984 }
986 }
987 }
988 }
989 }
990 if num_bootstraps > 0 {
996 if summary_stat {
1000 bootstraps.push(counts.clone());
1002 bootstraps.push(vec![0f32; num_genes]);
1004 } else {
1005 for _ in 0..num_bootstraps {
1007 bootstraps.push(counts.clone());
1008 }
1009 }
1010 } } if alt_resolution {
1014 alt_res_cells.lock().unwrap().push(cell_num as u64);
1015 }
1016
1017 let mut max_umi = 0.0f32;
1021 let mut sum_umi = 0.0f32;
1022 let mut num_expr: u32 = 0;
1023 expressed_vec.clear();
1024 expressed_ind.clear();
1025
1026 for (gn, c) in counts.iter().enumerate() {
1027 max_umi = if *c > max_umi { *c } else { max_umi };
1028 sum_umi += *c;
1029 if *c > 0.0 {
1030 num_expr += 1;
1031 expressed_vec.push(*c);
1032 expressed_ind.push(gn);
1033 }
1034 }
1035
1036 if num_expr == 0 {
1037 empty_resolved_cells.lock().unwrap().push(cell_num as u64);
1038 }
1039
1040 let num_mapped = nrec;
1041 let dedup_rate = sum_umi / num_mapped as f32;
1042
1043 let num_unmapped = match unmapped_count.get(&bc) {
1044 Some(nu) => *nu,
1045 None => 0u32,
1046 };
1047
1048 let mapping_rate = num_mapped as f32 / (num_mapped + num_unmapped) as f32;
1049
1050 let mean_expr = sum_umi / num_expr as f32;
1052 let num_genes_over_mean = expressed_vec.iter().fold(0u32, |acc, x| {
1054 if x > &mean_expr {
1055 acc + 1u32
1056 } else {
1057 acc
1058 }
1059 });
1060 let mean_by_max = mean_expr / max_umi;
1062
1063 let row_index: usize; {
1065 let bc_mer: BitKmer = (bc, bclen as u8);
1067
1068 if !use_mtx {
1069 eds_bytes = sce::eds::as_bytes(&counts, num_rows)
1070 .expect("can't convert vector to eds");
1071 }
1072
1073 if num_bootstraps > 0 {
1075 if summary_stat {
1077 eds_mean_bytes = sce::eds::as_bytes(&bootstraps[0], num_rows)
1078 .expect("can't convert vector to eds");
1079 eds_var_bytes = sce::eds::as_bytes(&bootstraps[1], num_rows)
1080 .expect("can't convert vector to eds");
1081 } else {
1082 for i in 0..num_bootstraps {
1083 let bt_eds_bytes_slice =
1084 sce::eds::as_bytes(&bootstraps[i as usize], num_rows)
1085 .expect("can't convert vector to eds");
1086 bt_eds_bytes.append(&mut bt_eds_bytes_slice.clone());
1087 }
1088 }
1089 }
1090
1091 let writer_deref = bcout.lock();
1092 let writer = &mut *writer_deref.unwrap();
1093
1094 row_index = writer.row_index;
1096 writer.row_index += 1;
1097
1098 let bc_bytes = &bitmer_to_bytes(bc_mer)[..];
1100 writeln!(&mut writer.barcode_file, "{}", unsafe {
1101 std::str::from_utf8_unchecked(bc_bytes)
1102 })
1103 .expect("can't write to barcode file.");
1104
1105 if !use_mtx {
1107 writer
1109 .eds_file
1110 .write_all(&eds_bytes)
1111 .expect("can't write to matrix file.");
1112 } else {
1113 for (ind, val) in expressed_ind.iter().zip(expressed_vec.iter()) {
1115 writer.trimat.add_triplet(row_index, *ind, *val);
1116 }
1117 }
1118 writeln!(
1119 &mut writer.feature_file,
1120 "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
1121 unsafe { std::str::from_utf8_unchecked(bc_bytes) },
1122 (num_mapped + num_unmapped),
1123 num_mapped,
1124 sum_umi,
1125 mapping_rate,
1126 dedup_rate,
1127 mean_by_max,
1128 num_expr,
1129 num_genes_over_mean
1130 )
1131 .expect("can't write to feature file");
1132
1133 if num_bootstraps > 0 {
1134 if summary_stat {
1135 if let Some((meanf, varf)) =
1136 &mut writer.bootstrap_helper.mean_var_files
1137 {
1138 meanf
1139 .write_all(&eds_mean_bytes)
1140 .expect("can't write to bootstrap mean file.");
1141 varf.write_all(&eds_var_bytes)
1142 .expect("can't write to bootstrap var file.");
1143 }
1144 } else if let Some(bsfile) = &mut writer.bootstrap_helper.bsfile {
1145 bsfile
1146 .write_all(&bt_eds_bytes)
1147 .expect("can't write to bootstrap file");
1148 }
1149 } }
1151
1152 if dump_eq {
1155 let eqmap_deref = eqid_map_lockc.lock();
1156 let geqmap = &mut *eqmap_deref.unwrap();
1157 let mut next_id = geqmap.global_eqc.len() as u64;
1160 for (labels, count) in gene_eqc.iter() {
1161 let mut found = true;
1162 match geqmap.global_eqc.get(labels) {
1163 Some(eqid) => {
1164 geqmap.cell_level_count.push((*eqid, *count));
1165 }
1166 None => {
1167 found = false;
1168 geqmap.cell_level_count.push((next_id, *count));
1169 }
1170 }
1171 if !found {
1172 geqmap.global_eqc.insert(labels.to_vec().clone(), next_id);
1173 next_id += 1;
1174 }
1175 }
1176 geqmap.cell_offset.push((row_index, gene_eqc.len()));
1178 }
1179 gene_eqc.clear();
1181 } } } local_nrec
1185 });
1186
1187 thread_handles.push(handle);
1188 }
1189
1190 let cb = |_new_bytes: u64, new_rec: u64| {
1191 pbar.inc(new_rec);
1192 };
1193
1194 let _ = if let Some(ret_bc) = retained_bc {
1197 let filter_fn = |buf: &[u8], record_context: &AlevinFryRecordContext| -> bool {
1198 let (bc, _umi) =
1199 chunk::Chunk::<AlevinFryReadRecord>::peek_record(&buf[8..], record_context);
1200 ret_bc.contains(&bc)
1201 };
1202 chunk_reader.start_filtered(&mut br, filter_fn, Some(cb))
1203 } else {
1204 chunk_reader.start(&mut br, Some(cb))
1205 };
1206
1207 let gn_path = output_matrix_path.join("quants_mat_cols.txt");
1208 let gn_file = File::create(gn_path).expect("couldn't create gene name file.");
1209 let mut gn_writer = BufWriter::new(gn_file);
1210
1211 if !usa_mode {
1213 for g in gene_names {
1214 gn_writer.write_all(format!("{}\n", g).as_bytes())?;
1215 }
1216 } else {
1217 for g in gene_names.iter() {
1220 gn_writer.write_all(format!("{}\n", *g).as_bytes())?;
1221 }
1222 for g in gene_names.iter() {
1224 gn_writer.write_all(format!("{}-U\n", *g).as_bytes())?;
1225 }
1226 for g in gene_names.iter() {
1228 gn_writer.write_all(format!("{}-A\n", *g).as_bytes())?;
1229 }
1230 }
1231
1232 let mut total_records = 0usize;
1233 for h in thread_handles {
1234 match h.join() {
1235 Ok(rc) => {
1236 total_records += rc;
1237 }
1238 Err(_e) => {
1239 info!(log, "thread panicked");
1240 }
1241 }
1242 }
1243
1244 if use_mtx {
1246 let writer_deref = bc_writer.lock();
1247 let writer = &mut *writer_deref.unwrap();
1248 writer.eds_file.flush().unwrap();
1249 fs::remove_file(&mat_path)?;
1251 let mtx_path = output_matrix_path.join("quants_mat.mtx");
1252 sprs::io::write_matrix_market(mtx_path, &writer.trimat)?;
1253 }
1254
1255 let pb_msg = format!(
1256 "finished quantifying {} cells.",
1257 num_cells.to_formatted_string(&Locale::en)
1258 );
1259 pbar.finish_with_message(pb_msg);
1260
1261 info!(
1262 log,
1263 "processed {} total read records",
1264 total_records.to_formatted_string(&Locale::en)
1265 );
1266
1267 if dump_eq {
1268 write_eqc_counts(&eqid_map_lock, num_rows, usa_mode, &output_matrix_path, log)?;
1269 }
1270
1271 let meta_info = json!({
1272 "cmd" : quant_opts.cmdline,
1273 "version_str": quant_opts.version,
1274 "resolution_strategy" : resolution.to_string(),
1275 "num_quantified_cells" : num_cells,
1276 "num_genes" : num_rows,
1277 "dump_eq" : dump_eq,
1278 "usa_mode" : usa_mode,
1279 "alt_resolved_cell_numbers" : *alt_res_cells.lock().unwrap(),
1280 "empty_resolved_cell_numbers" : *empty_resolved_cells.lock().unwrap(),
1281 "quant_options" : quant_opts
1282 });
1283
1284 let mut meta_info_file =
1285 File::create(output_path.join("quant.json")).expect("couldn't create quant.json file.");
1286 let aux_info_str = serde_json::to_string_pretty(&meta_info).expect("could not format json.");
1287 meta_info_file
1288 .write_all(aux_info_str.as_bytes())
1289 .expect("cannot write to quant.json file");
1290
1291 Ok(())
1307}
1308
1309pub fn velo_quantify(_quant_opts: QuantOpts) -> anyhow::Result<()> {
1312 unimplemented!("not implemented on this branch yet");
1313 }