fastqrab-steps 0.9.1

Pipeline building blocks for fastqrab: read transformations, filters, reports, and demultiplexing
Documentation
use indexmap::IndexMap;

use crate::transformations::prelude::*;
use fastqrab_dna::dna::reverse_complement;
use fastqrab_io::STDIN_MAGIC_PATH;
use fastqrab_io::io::apply_to_read_sequences;

/// Quantify Kmer occurance vs database
#[derive(Clone, JsonSchema)]
#[tpd]
#[derive(Debug)]
pub struct Kmers {
    pub out_label: TagLabel,

    #[schemars(with = "String")]
    #[tpd(adapt_in_verify(String))]
    pub segment: SegmentIndexOrAll,

    // Kmer database configuration
    #[tpd(alias = "files")]
    #[tpd(alias = "filenames")]
    pub filename: Vec<String>,

    pub k: usize,

    #[tpd(alias = "canonical")]
    pub count_reverse_complement: bool,

    pub min_count: usize,

    #[schemars(skip)]
    #[tpd(skip, default)]
    pub resolved_kmer_db: Option<IndexMap<Vec<u8>, usize>>,
}

impl VerifyIn<PartialConfig> for PartialKmers {
    fn verify(
        &mut self,
        parent: &PartialConfig,
        _options: &VerifyOptions,
    ) -> std::result::Result<(), ValidationFailure>
    where
        Self: Sized,
    {
        self.segment.validate_segment(parent);
        self.min_count.or(1);
        self.min_count.verify(|min_count| {
            if *min_count == 0 {
                return Err(ValidationFailure::new(
                    "'min_count' must be greater than 0.",
                    Some("Please specify a positive integer value for min_count (e.g., min_count = 1)."),
                ));
            }
            Ok(())
        });

        self.filename.verify(|filenames| {
            if filenames.is_empty() {
                return Err(ValidationFailure::new(
                    "At least one filename must be provided for the k-mer database.",
                    Some("Please specify the path to your k-mer database file."),
                ));
            }
            if filenames.iter().any(|filepath| filepath.as_ref().expect("Should not be reached on wrong type for filename") == STDIN_MAGIC_PATH) {
                return Err(ValidationFailure::new(
                    "QuantifyKmers: K-mer database cannot be read from stdin",
                    Some("Please specify a file path for the k-mer database instead of using '--stdin--'.")
                ));
            }
            Ok(())
        });

        self.k.verify(|k| {
            if *k == 0 {
                return Err(ValidationFailure::new(
                    "'k' must be greater than 0.",
                    Some("Please specify a positive integer value for k (e.g., k = 5 for 5-mers)."),
                ));
            }
            Ok(())
        });

        Ok(())
    }
}

impl TagUser for PartialTaggedVariant<PartialKmers> {
    fn get_tag_usage(
        &mut self,
        _tags_available: &IndexMap<TagLabel, TagMetadata>,
        _segment_order: &[String],
    ) -> Option<TagUsageInfo<'_>> {
        if let Some(inner) = self.toml_value.value.as_mut() {
            Some(TagUsageInfo {
                declared_tag: inner.out_label.to_declared_tag(TagValueType::Numeric((
                    Some(NonNaN::new(0.0).expect("can't fail")),
                    None,
                ))),
                ..Default::default()
            })
        } else {
            None // cov:excl-line
        }
    }
}

impl Step for Kmers {
    fn init(
        &mut self,
        input_info: &InputInfo,
        _output_files: StepOutputFiles,
        _demultiplex_info: &OptDemultiplex,
    ) -> Result<Option<DemultiplexBarcodes>> {
        let db = build_kmer_database(
            &self.filename,
            self.k,
            self.min_count,
            self.count_reverse_complement,
            input_info.use_rapidgzip,
        )?; // cov:excl-line
        self.resolved_kmer_db = Some(db);

        Ok(None)
    }

    fn apply(
        &self,
        mut block: FastQBlocksCombined,
        _input_info: &crate::transformations::InputInfo,
        _demultiplex_info: &OptDemultiplex,
    ) -> anyhow::Result<(FastQBlocksCombined, bool)> {
        let kmer_db = self
            .resolved_kmer_db
            .as_ref()
            .expect("resolved_kmer_db must be set during initialization");
        let k = self.k;

        super::extract_numeric_tags_plus_all(
            self.segment,
            &self.out_label,
            #[expect(
                clippy::cast_precision_loss,
                reason = "loss is acceptable, it's going to be within u32 range"
            )]
            |read| {
                let count = count_kmers_in_database(read.seq(), k, kmer_db);
                count as f64
            },
            #[expect(
                clippy::cast_precision_loss,
                reason = "loss is acceptable, it's going to be within u32 range"
            )]
            |reads| {
                let total_count: usize = reads
                    .iter()
                    .map(|read| count_kmers_in_database(read.seq(), k, kmer_db))
                    .sum();
                total_count as f64
            },
            &mut block,
        );

        Ok((block, true))
    }
}

pub fn build_kmer_database(
    files: &[String],
    k: usize,
    min_count: usize,
    canonical: bool,
    use_rapidgzip: bool,
) -> Result<IndexMap<Vec<u8>, usize>> {
    let mut kmer_counts: IndexMap<Vec<u8>, usize> = IndexMap::new();

    for file_path in files {
        apply_to_read_sequences(
            file_path,
            &mut |seq: &[u8]| {
                // Extract all kmers from this sequence
                if seq.len() >= k {
                    for i in 0..=(seq.len() - k) {
                        let kmer: Vec<u8> = seq[i..i + k]
                            .iter()
                            .map(|&b| b.to_ascii_uppercase())
                            .collect();

                        // Only count valid DNA sequences (A, C, G, T)
                        if kmer.iter().all(|&b| matches!(b, b'A' | b'C' | b'G' | b'T')) {
                            if canonical {
                                let revcomp = reverse_complement(&kmer);
                                *kmer_counts.entry(revcomp).or_insert(0) += 1;
                            }
                            *kmer_counts.entry(kmer).or_insert(0) += 1;
                        } // cov:excl-line
                    }
                } // cov:excl-line
                Ok(())
            },
            true,
            true, //all reads in BAM.
            use_rapidgzip,
        )
        .with_context(|| format!("Failed to parse kmer database file: {file_path}"))?;
    }

    // Filter by minimum count
    kmer_counts.retain(|_, &mut count| count >= min_count);

    Ok(kmer_counts)
}

/// Count how many kmers from a sequence are in the database
pub fn count_kmers_in_database(
    sequence: &[u8],
    k: usize,
    kmer_db: &IndexMap<Vec<u8>, usize>,
) -> usize {
    if sequence.len() < k {
        return 0;
    }

    let mut count = 0;
    for i in 0..=(sequence.len() - k) {
        let kmer: Vec<u8> = sequence[i..i + k]
            .iter()
            .map(|&b| b.to_ascii_uppercase())
            .collect();

        if kmer_db.contains_key(&kmer) {
            count += 1;
        }
    }

    count
}