ngs/derive/command/
instrument.rs

1//! Functionality relating to the `ngs derive instrument` subcommand itself.
2
3use anyhow::bail;
4use std::collections::HashSet;
5use std::path::PathBuf;
6use std::thread;
7
8use clap::Args;
9use tracing::info;
10
11use crate::derive::instrument::compute;
12use crate::derive::instrument::reads::IlluminaReadName;
13use crate::utils::formats::bam::ParsedBAMFile;
14use crate::utils::formats::utils::IndexCheck;
15
16/// Clap arguments for the `ngs derive instrument` subcommand.
17#[derive(Args)]
18pub struct DeriveInstrumentArgs {
19    // Source BAM.
20    #[arg(value_name = "BAM")]
21    src: PathBuf,
22
23    /// Only examine the first n records in the file.
24    #[arg(short, long, value_name = "USIZE")]
25    num_records: Option<usize>,
26
27    /// Use a specific number of threads.
28    #[arg(short, long, value_name = "USIZE")]
29    threads: Option<usize>,
30}
31
32/// Entrypoint for the `ngs derive instrument` subcommand.
33pub fn derive(args: DeriveInstrumentArgs) -> anyhow::Result<()> {
34    let first_n_reads: Option<usize> = args.num_records;
35    let threads = match args.threads {
36        Some(t) => t,
37        None => thread::available_parallelism().map(usize::from)?,
38    };
39
40    info!(
41        "Starting derive instrument subcommand with {} threads.",
42        threads
43    );
44
45    let rt = tokio::runtime::Builder::new_multi_thread()
46        .worker_threads(threads)
47        .build()?;
48
49    rt.block_on(app(args.src, first_n_reads))
50}
51
52/// Main function for the `ngs derive instrument` subcommand.
53async fn app(src: PathBuf, first_n_reads: Option<usize>) -> anyhow::Result<()> {
54    let mut instrument_names = HashSet::new();
55    let mut flowcell_names = HashSet::new();
56
57    let ParsedBAMFile {
58        mut reader, header, ..
59    } = crate::utils::formats::bam::open_and_parse(src, IndexCheck::Full)?;
60
61    // (1) Collect instrument names and flowcell names from reads within the
62    // file. Support for sampling only a portion of the reads is provided.
63    let mut samples = 0;
64    let mut sample_max = 0;
65
66    if let Some(s) = first_n_reads {
67        sample_max = s;
68    }
69
70    for result in reader.records(&header.parsed) {
71        let record = result?;
72
73        if let Some(read_name) = record.read_name() {
74            let name: &str = read_name.as_ref();
75
76            match name.parse::<IlluminaReadName>() {
77                Ok(read) => {
78                    instrument_names.insert(read.instrument_name);
79                    if let Some(fc) = read.flowcell {
80                        flowcell_names.insert(fc);
81                    }
82                }
83                Err(_) => {
84                    bail!(
85                        "Could not parse Illumina-formatted query names for read: {}",
86                        name
87                    );
88                }
89            }
90        }
91
92        if sample_max > 0 {
93            samples += 1;
94            if samples > sample_max {
95                break;
96            }
97        }
98    }
99
100    // (2) Derive the predict instrument results based on these detected
101    // instrument names and flowcell names.
102    let result = compute::predict(instrument_names, flowcell_names);
103
104    // (3) Print the output to stdout as JSON (more support for different output
105    // types may be added in the future, but for now, only JSON).
106    let output = serde_json::to_string_pretty(&result).unwrap();
107    print!("{}", output);
108
109    Ok(())
110}