use std::collections::HashMap;
use std::io;
use std::path::Path;
use std::sync::Arc;
use rayon::prelude::*;
use crate::cluster::cluster::ClusterResult1D;
use crate::cluster::feature::SimpleFeature;
use crate::cluster::io::load_parquet;
use crate::cluster::peak::ThresholdMode;
use crate::cluster::pseudo::{PseudoSpecOpts, PseudoSpectrum, build_pseudo_spectra_from_pairs, cluster_mz_mu, PseudoFragment};
use crate::cluster::scoring::{assign_ms2_to_best_ms1_by_xic, jaccard_time, ms1_to_ms2_map, query_precursor_scored, query_precursors_scored_par, MatchScoreMode, PrecursorLike, PrecursorSearchIndex, ScoredHit, XicScoreOpts};
use crate::cluster::utility::robust_noise_mad;
use crate::data::dia::{DiaIndex, TimsDatasetDIA};
pub enum AssignmentMode<'a> {
Geometric(&'a ScoreOpts),
Xic(&'a XicScoreOpts),
}
pub fn best_ms1_for_each_ms2_geom(
ms1: &[ClusterResult1D],
ms2: &[ClusterResult1D],
pairs: &[(usize, usize)],
opts: &ScoreOpts,
) -> Vec<Option<usize>> {
crate::cluster::scoring::best_ms1_for_each_ms2(ms1, ms2, pairs, opts)
}
pub fn best_ms1_for_each_ms2_xic(
ms1: &[ClusterResult1D],
ms2: &[ClusterResult1D],
pairs: &[(usize, usize)],
opts: &XicScoreOpts,
) -> Vec<Option<usize>> {
let triples = assign_ms2_to_best_ms1_by_xic(ms1, ms2, pairs, opts);
let mut best: Vec<Option<usize>> = vec![None; ms2.len()];
for (ms2_idx, ms1_idx, _s) in triples {
if ms2_idx < best.len() {
best[ms2_idx] = Some(ms1_idx);
}
}
best
}
pub fn best_ms1_for_each_ms2_any(
ms1: &[ClusterResult1D],
ms2: &[ClusterResult1D],
pairs: &[(usize, usize)],
mode: AssignmentMode<'_>,
) -> Vec<Option<usize>> {
match mode {
AssignmentMode::Geometric(opts) => {
best_ms1_for_each_ms2_geom(ms1, ms2, pairs, opts)
}
AssignmentMode::Xic(opts) => {
best_ms1_for_each_ms2_xic(ms1, ms2, pairs, opts)
}
}
}
#[derive(Clone, Debug)]
pub struct CandidateOpts {
pub min_rt_jaccard: f32,
pub ms2_rt_guard_sec: f64,
pub rt_bucket_width: f64,
pub max_ms1_rt_span_sec: Option<f64>,
pub max_ms2_rt_span_sec: Option<f64>,
pub min_raw_sum: ThresholdMode,
pub max_rt_apex_delta_sec: Option<f32>,
pub max_scan_apex_delta: Option<usize>,
pub min_im_overlap_scans: usize,
pub reject_frag_inside_precursor_tile: bool,
}
impl Default for CandidateOpts {
fn default() -> Self {
Self {
min_rt_jaccard: 0.0,
ms2_rt_guard_sec: 0.0,
rt_bucket_width: 1.0,
max_ms1_rt_span_sec: Some(60.0),
max_ms2_rt_span_sec: Some(60.0),
min_raw_sum: ThresholdMode::AdaptiveNoise(3.0),
max_rt_apex_delta_sec: Some(2.0),
max_scan_apex_delta: Some(6),
min_im_overlap_scans: 1,
reject_frag_inside_precursor_tile: false,
}
}
}
impl CandidateOpts {
pub fn with_fixed_raw_sum(mut self, val: f32) -> Self {
self.min_raw_sum = ThresholdMode::Fixed(val);
self
}
pub fn with_adaptive_raw_sum(mut self, sigma_multiplier: f32) -> Self {
self.min_raw_sum = ThresholdMode::AdaptiveNoise(sigma_multiplier);
self
}
pub fn legacy_defaults() -> Self {
Self {
min_raw_sum: ThresholdMode::Fixed(1.0),
..Default::default()
}
}
}
#[derive(Clone, Copy, Debug)]
pub struct PairFeatures {
pub jacc_rt: f32, pub rt_apex_delta_s: f32, pub im_apex_delta_scans: f32, pub im_overlap_scans: u32, pub im_union_scans: u32, pub ms1_raw_sum: f32, pub shape_ok: bool, pub z_rt: f32, pub z_im: f32, pub s_shape: f32, }
#[derive(Clone, Debug)]
pub struct ScoreOpts {
pub w_jacc_rt: f32,
pub w_shape: f32,
pub w_rt_apex: f32,
pub w_im_apex: f32,
pub w_im_overlap: f32,
pub w_ms1_intensity: f32,
pub rt_apex_scale_s: f32,
pub im_apex_scale_scans: f32,
pub shape_neutral: f32,
pub min_sigma_rt: f32,
pub min_sigma_im: f32,
pub w_shape_rt_inner: f32,
pub w_shape_im_inner: f32,
}
impl Default for ScoreOpts {
fn default() -> Self {
Self {
w_jacc_rt: 1.0,
w_shape: 1.0,
w_rt_apex: 0.75,
w_im_apex: 0.75,
w_im_overlap: 0.5,
w_ms1_intensity: 0.25,
rt_apex_scale_s: 0.75, im_apex_scale_scans: 3.0, shape_neutral: 0.6, min_sigma_rt: 0.05,
min_sigma_im: 0.5,
w_shape_rt_inner: 1.0,
w_shape_im_inner: 1.0,
}
}
}
#[inline]
pub(crate) fn build_features(ms1: &ClusterResult1D, ms2: &ClusterResult1D, opts: &ScoreOpts) -> PairFeatures {
let (rt1_lo, rt1_hi) = (
ms1.rt_fit.mu as f64 - (ms1.rt_fit.sigma as f64) * 3.0,
ms1.rt_fit.mu as f64 + (ms1.rt_fit.sigma as f64) * 3.0,
);
let (rt2_lo, rt2_hi) = (
ms2.rt_fit.mu as f64 - (ms2.rt_fit.sigma as f64) * 3.0,
ms2.rt_fit.mu as f64 + (ms2.rt_fit.sigma as f64) * 3.0,
);
let jacc_rt = jaccard_time(rt1_lo, rt1_hi, rt2_lo, rt2_hi).clamp(0.0, 1.0);
let rt_apex_delta_s = (ms1.rt_fit.mu - ms2.rt_fit.mu).abs();
let im_apex_delta_scans = (ms1.im_fit.mu - ms2.im_fit.mu).abs();
let (im_inter, im_union) = im_overlap_and_union(ms1.im_window, ms2.im_window);
let s1_rt = ms1.rt_fit.sigma.max(opts.min_sigma_rt);
let s2_rt = ms2.rt_fit.sigma.max(opts.min_sigma_rt);
let s1_im = ms1.im_fit.sigma.max(opts.min_sigma_im);
let s2_im = ms2.im_fit.sigma.max(opts.min_sigma_im);
let (mut shape_ok, mut z_rt, mut z_im, mut s_shape) = (false, 0.0, 0.0, 0.0);
if let (Some(sig_rt), Some(sig_im)) = (pooled_sigma(s1_rt, s2_rt), pooled_sigma(s1_im, s2_im)) {
if sig_rt.is_finite() && sig_rt > 0.0 && sig_im.is_finite() && sig_im > 0.0 {
z_rt = rt_apex_delta_s / sig_rt;
z_im = im_apex_delta_scans / sig_im;
let q = -0.5_f32 * (opts.w_shape_rt_inner * z_rt * z_rt
+ opts.w_shape_im_inner * z_im * z_im);
s_shape = q.exp(); shape_ok = s_shape.is_finite();
}
}
PairFeatures {
jacc_rt,
rt_apex_delta_s,
im_apex_delta_scans,
im_overlap_scans: im_inter,
im_union_scans: im_union,
ms1_raw_sum: ms1.raw_sum,
shape_ok,
z_rt,
z_im,
s_shape,
}
}
#[inline]
fn im_overlap_and_union(a: (usize, usize), b: (usize, usize)) -> (u32, u32) {
let lo = a.0.max(b.0);
let hi = a.1.min(b.1);
let inter = if hi >= lo { (hi - lo + 1) as u32 } else { 0 };
let a_len = if a.1 >= a.0 { (a.1 - a.0 + 1) as u32 } else { 0 };
let b_len = if b.1 >= b.0 { (b.1 - b.0 + 1) as u32 } else { 0 };
let union = a_len + b_len - inter;
(inter, union.max(1))
}
#[inline]
fn pooled_sigma(s1: f32, s2: f32) -> Option<f32> {
let v = s1 * s1 + s2 * s2;
if v.is_finite() && v > 0.0 { Some(v.sqrt()) } else { None }
}
#[inline]
pub(crate) fn exp_decay(delta: f32, scale: f32) -> f32 {
if !delta.is_finite() || !scale.is_finite() || scale <= 0.0 { return 0.0; }
(-delta / scale).exp()
}
#[inline]
pub(crate) fn safe_log1p(x: f32) -> f32 {
if x.is_finite() && x >= 0.0 { (1.0 + x as f64).ln() as f32 } else { 0.0 }
}
#[derive(Clone, Debug)]
pub struct AssignmentResult {
pub pairs: Vec<(usize, usize)>,
pub ms2_best_ms1: Vec<Option<usize>>,
pub ms1_to_ms2: Vec<Vec<usize>>,
}
#[derive(Clone, Debug)]
pub struct PseudoBuildResult {
pub assignment: AssignmentResult,
pub pseudo_spectra: Vec<PseudoSpectrum>,
}
pub fn build_pseudo_spectra_all_pairs(
ds: &TimsDatasetDIA,
ms1: &[ClusterResult1D],
ms2: &[ClusterResult1D],
features: Option<&[SimpleFeature]>,
pseudo_opts: &PseudoSpecOpts,
) -> PseudoBuildResult {
let cand_opts = CandidateOpts {
min_rt_jaccard: 0.0,
ms2_rt_guard_sec: 0.0,
rt_bucket_width: 1.0,
max_ms1_rt_span_sec: None,
max_ms2_rt_span_sec: None,
min_raw_sum: ThresholdMode::Fixed(1.0),
max_rt_apex_delta_sec: None,
max_scan_apex_delta: None,
min_im_overlap_scans: 1,
reject_frag_inside_precursor_tile: true,
};
let idx = PrecursorSearchIndex::build(ds, ms1, &cand_opts);
let pairs = idx.enumerate_pairs(ms1, ms2, &cand_opts);
if pairs.is_empty() {
return PseudoBuildResult {
assignment: AssignmentResult {
pairs,
ms2_best_ms1: vec![None; ms2.len()],
ms1_to_ms2: vec![Vec::new(); ms1.len()],
},
pseudo_spectra: Vec::new(),
};
}
let empty_feats: &[SimpleFeature] = &[];
let feat_slice = features.unwrap_or(empty_feats);
let pseudo_spectra = build_pseudo_spectra_from_pairs(
ms1,
ms2,
feat_slice,
&pairs,
pseudo_opts,
);
let mut ms1_to_ms2: Vec<Vec<usize>> = vec![Vec::new(); ms1.len()];
for (ms2_idx, ms1_idx) in &pairs {
if *ms1_idx < ms1_to_ms2.len() {
ms1_to_ms2[*ms1_idx].push(*ms2_idx);
}
}
let ms2_best_ms1 = vec![None; ms2.len()];
let assignment = AssignmentResult {
pairs,
ms2_best_ms1,
ms1_to_ms2,
};
PseudoBuildResult {
assignment,
pseudo_spectra,
}
}
pub fn build_pseudo_spectra_end_to_end(
ds: &TimsDatasetDIA,
ms1: &[ClusterResult1D],
ms2: &[ClusterResult1D],
features: Option<&[SimpleFeature]>,
cand_opts: &CandidateOpts,
score_opts: &ScoreOpts,
pseudo_opts: &PseudoSpecOpts,
) -> PseudoBuildResult {
let idx = PrecursorSearchIndex::build(ds, ms1, cand_opts);
let pairs = idx.enumerate_pairs(ms1, ms2, cand_opts);
if pairs.is_empty() {
return PseudoBuildResult {
assignment: AssignmentResult {
pairs,
ms2_best_ms1: vec![None; ms2.len()],
ms1_to_ms2: vec![Vec::new(); ms1.len()],
},
pseudo_spectra: Vec::new(),
};
}
let ms2_best_ms1 = best_ms1_for_each_ms2_geom(
ms1,
ms2,
&pairs,
score_opts,
);
let ms1_to_ms2 = ms1_to_ms2_map(
ms1.len(),
&ms2_best_ms1,
);
let assignment = AssignmentResult {
pairs,
ms2_best_ms1: ms2_best_ms1.clone(),
ms1_to_ms2: ms1_to_ms2.clone(),
};
let mut winning_pairs: Vec<(usize, usize)> = Vec::new();
for (ms1_idx, js) in ms1_to_ms2.iter().enumerate() {
for &ms2_idx in js {
winning_pairs.push((ms2_idx, ms1_idx));
}
}
debug_assert!({
use std::collections::HashSet;
let mut seen = HashSet::new();
for (ms2_idx, _) in &winning_pairs {
if !seen.insert(ms2_idx) {
panic!("Duplicated ms2");
}
}
true
});
let empty_feats: &[SimpleFeature] = &[];
let feat_slice = features.unwrap_or(empty_feats);
let pseudo_spectra = build_pseudo_spectra_from_pairs(
ms1,
ms2,
feat_slice,
&winning_pairs,
pseudo_opts,
);
PseudoBuildResult {
assignment,
pseudo_spectra,
}
}
pub fn build_pseudo_spectra_end_to_end_xic(
ds: &TimsDatasetDIA,
ms1: &[ClusterResult1D],
ms2: &[ClusterResult1D],
features: Option<&[SimpleFeature]>,
cand_opts: &CandidateOpts,
xic_opts: &XicScoreOpts,
pseudo_opts: &PseudoSpecOpts,
) -> PseudoBuildResult {
let idx = PrecursorSearchIndex::build(ds, ms1, cand_opts);
let pairs = idx.enumerate_pairs(ms1, ms2, cand_opts);
if pairs.is_empty() {
return PseudoBuildResult {
assignment: AssignmentResult {
pairs,
ms2_best_ms1: vec![None; ms2.len()],
ms1_to_ms2: vec![Vec::new(); ms1.len()],
},
pseudo_spectra: Vec::new(),
};
}
let ms2_best_ms1 = best_ms1_for_each_ms2_xic(
ms1,
ms2,
&pairs,
xic_opts,
);
let ms1_to_ms2 = ms1_to_ms2_map(
ms1.len(),
&ms2_best_ms1,
);
let assignment = AssignmentResult {
pairs,
ms2_best_ms1: ms2_best_ms1.clone(),
ms1_to_ms2: ms1_to_ms2.clone(),
};
let mut winning_pairs: Vec<(usize, usize)> = Vec::new();
for (ms1_idx, js) in ms1_to_ms2.iter().enumerate() {
for &ms2_idx in js {
winning_pairs.push((ms2_idx, ms1_idx));
}
}
debug_assert!({
use std::collections::HashSet;
let mut seen = HashSet::new();
for (ms2_idx, _) in &winning_pairs {
if !seen.insert(ms2_idx) {
panic!("Duplicated ms2");
}
}
true
});
let empty_feats: &[SimpleFeature] = &[];
let feat_slice = features.unwrap_or(empty_feats);
let pseudo_spectra = build_pseudo_spectra_from_pairs(
ms1,
ms2,
feat_slice,
&winning_pairs,
pseudo_opts,
);
PseudoBuildResult {
assignment,
pseudo_spectra,
}
}
#[derive(Clone, Debug, Default)]
pub struct FragmentMetadata {
pub im_mu: Vec<f32>,
pub im_windows: Vec<(usize, usize)>,
pub keep: Vec<bool>,
pub cluster_ids: Vec<u64>,
pub mz_mu: Vec<f32>,
}
impl FragmentMetadata {
#[inline]
pub fn len(&self) -> usize {
self.cluster_ids.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.cluster_ids.is_empty()
}
#[inline]
pub fn count_valid(&self) -> usize {
self.keep.iter().filter(|&&k| k).count()
}
#[inline]
pub fn get(&self, idx: usize) -> Option<FragmentMetadataEntry> {
if idx >= self.len() {
return None;
}
Some(FragmentMetadataEntry {
im_mu: self.im_mu[idx],
im_window: self.im_windows[idx],
keep: self.keep[idx],
cluster_id: self.cluster_ids[idx],
mz_mu: self.mz_mu[idx],
})
}
}
#[derive(Clone, Copy, Debug)]
pub struct FragmentMetadataEntry {
pub im_mu: f32,
pub im_window: (usize, usize),
pub keep: bool,
pub cluster_id: u64,
pub mz_mu: f32,
}
#[derive(Clone, Debug)]
pub struct FragmentStorage {
pub clusters: Arc<[ClusterResult1D]>,
pub id_to_idx: HashMap<u64, usize>,
}
impl FragmentStorage {
pub fn from_vec(clusters: Vec<ClusterResult1D>) -> Self {
let id_to_idx: HashMap<u64, usize> = clusters
.iter()
.enumerate()
.map(|(i, c)| (c.cluster_id, i))
.collect();
Self {
clusters: clusters.into(),
id_to_idx,
}
}
#[inline]
pub fn get(&self, idx: usize) -> Option<&ClusterResult1D> {
self.clusters.get(idx)
}
#[inline]
pub fn get_by_id(&self, id: u64) -> Option<&ClusterResult1D> {
self.id_to_idx.get(&id).and_then(|&i| self.clusters.get(i))
}
#[inline]
pub fn index_of(&self, id: u64) -> Option<usize> {
self.id_to_idx.get(&id).copied()
}
#[inline]
pub fn len(&self) -> usize {
self.clusters.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.clusters.is_empty()
}
#[inline]
pub fn iter(&self) -> impl Iterator<Item = &ClusterResult1D> {
self.clusters.iter()
}
}
#[derive(Clone, Debug)]
pub struct FragmentGroupIndex {
pub ms2_indices: Vec<usize>,
pub rt_apex: Vec<f32>,
}
#[derive(Clone, Debug)]
pub struct FragmentQueryOpts {
pub max_rt_apex_delta_sec: Option<f32>,
pub max_scan_apex_delta: Option<usize>,
pub min_im_overlap_scans: usize,
pub require_tile_compat: bool,
pub reject_frag_inside_precursor_tile: bool,
}
impl Default for FragmentQueryOpts {
fn default() -> Self {
Self {
max_rt_apex_delta_sec: Some(2.0),
max_scan_apex_delta: Some(6),
min_im_overlap_scans: 1,
require_tile_compat: true,
reject_frag_inside_precursor_tile: false,
}
}
}
#[derive(Clone, Debug)]
pub struct FragmentIndex {
dia_index: Arc<DiaIndex>,
ms2_storage: Arc<[ClusterResult1D]>,
ms2_im_mu: Vec<f32>,
ms2_im_windows: Vec<(usize, usize)>,
ms2_keep: Vec<bool>,
ms2_cluster_ids: Vec<u64>,
ms2_mz_mu: Vec<f32>,
by_group: HashMap<u32, FragmentGroupIndex>,
id_to_idx: HashMap<u64, usize>,
}
impl FragmentIndex {
#[inline]
pub fn ms2_slice(&self) -> &[ClusterResult1D] {
&self.ms2_storage
}
#[inline]
pub fn get_ms2_by_index(&self, idx: usize) -> Option<&ClusterResult1D> {
self.ms2_storage.get(idx)
}
#[inline]
pub fn get_ms2_by_cluster_id(&self, cid: u64) -> Option<&ClusterResult1D> {
self.id_to_idx.get(&cid).and_then(|&i| self.ms2_storage.get(i))
}
#[inline]
pub fn metadata(&self) -> FragmentMetadata {
FragmentMetadata {
im_mu: self.ms2_im_mu.clone(),
im_windows: self.ms2_im_windows.clone(),
keep: self.ms2_keep.clone(),
cluster_ids: self.ms2_cluster_ids.clone(),
mz_mu: self.ms2_mz_mu.clone(),
}
}
#[inline]
pub fn storage(&self) -> FragmentStorage {
FragmentStorage {
clusters: Arc::clone(&self.ms2_storage),
id_to_idx: self.id_to_idx.clone(),
}
}
#[inline]
pub fn dia_index(&self) -> &Arc<DiaIndex> {
&self.dia_index
}
#[inline]
pub fn group_index(&self, group: u32) -> Option<&FragmentGroupIndex> {
self.by_group.get(&group)
}
#[inline]
pub fn groups(&self) -> Vec<u32> {
self.by_group.keys().copied().collect()
}
#[inline]
pub fn count_valid(&self) -> usize {
self.ms2_keep.iter().filter(|&&k| k).count()
}
#[inline]
pub fn len(&self) -> usize {
self.ms2_storage.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.ms2_storage.is_empty()
}
fn build_with_storage(
dia_index: Arc<DiaIndex>,
ms2_storage: Arc<[ClusterResult1D]>,
opts: &CandidateOpts,
) -> Self {
let ms2: &[ClusterResult1D] = &ms2_storage;
let frame_time = &dia_index.frame_time;
let ms2_time_bounds: Vec<(f64, f64)> = ms2
.par_iter()
.map(|c| {
let mut t_lo = f64::INFINITY;
let mut t_hi = f64::NEG_INFINITY;
if !c.frame_ids_used.is_empty() {
for &fid in &c.frame_ids_used {
if let Some(&t) = frame_time.get(&fid) {
if t < t_lo { t_lo = t; }
if t > t_hi { t_hi = t; }
}
}
}
if !t_lo.is_finite() || !t_hi.is_finite() {
let (rt_lo, rt_hi) = c.rt_window;
if rt_hi >= rt_lo {
for fid in rt_lo as u32..=rt_hi as u32 {
if let Some(&t) = frame_time.get(&fid) {
if t < t_lo { t_lo = t; }
if t > t_hi { t_hi = t; }
}
}
}
}
(t_lo, t_hi)
})
.collect();
let raw_sums: Vec<f32> = ms2
.iter()
.filter(|c| c.ms_level == 2)
.map(|c| c.raw_sum)
.collect();
let raw_sum_noise = robust_noise_mad(&raw_sums);
let effective_min_raw_sum = opts.min_raw_sum.effective(raw_sum_noise);
let ms2_keep: Vec<bool> = ms2
.par_iter()
.enumerate()
.map(|(i, c)| {
if c.ms_level != 2 {
return false;
}
if c.window_group.is_none() {
return false;
}
if c.raw_sum < effective_min_raw_sum {
return false;
}
let (mut t0, mut t1) = ms2_time_bounds[i];
if t0.is_finite() {
t0 -= opts.ms2_rt_guard_sec;
}
if t1.is_finite() {
t1 += opts.ms2_rt_guard_sec;
}
if !(t0.is_finite() && t1.is_finite() && t1 > t0) {
return false;
}
if let Some(max_rt) = opts.max_ms2_rt_span_sec {
if (t1 - t0) > max_rt {
return false;
}
}
true
})
.collect();
let ms2_im_windows: Vec<(usize, usize)> =
ms2.iter().map(|c| c.im_window).collect();
let ms2_im_mu: Vec<f32> = ms2
.iter()
.map(|c| {
let mu = c.im_fit.mu;
if mu.is_finite() && mu > 0.0 {
mu
} else {
let (lo, hi) = c.im_window;
if hi > lo {
((lo + hi) as f32) * 0.5
} else {
f32::NAN
}
}
})
.collect();
let ms2_cluster_ids: Vec<u64> =
ms2.iter().map(|c| c.cluster_id).collect();
let ms2_rt_apex: Vec<f32> = ms2
.iter()
.enumerate()
.map(|(i, c)| {
let mu = c.rt_fit.mu;
if mu.is_finite() && mu > 0.0 {
mu
} else {
let (t0, t1) = ms2_time_bounds[i];
if t0.is_finite() && t1.is_finite() && t1 > t0 {
((t0 + t1) * 0.5) as f32
} else {
f32::NAN
}
}
})
.collect();
let ms2_mz_mu: Vec<f32> = ms2
.iter()
.map(|c| {
if let Some(m) = cluster_mz_mu(c) {
if m.is_finite() && m > 0.0 {
m
} else {
match c.mz_window {
Some((lo, hi)) if lo.is_finite() && hi.is_finite() && hi > lo => {
0.5 * (lo + hi) as f32
}
_ => f32::NAN,
}
}
} else {
match c.mz_window {
Some((lo, hi)) if lo.is_finite() && hi.is_finite() && hi > lo => {
0.5 * (lo + hi) as f32
}
_ => f32::NAN,
}
}
})
.collect();
let mut by_group_raw: HashMap<u32, Vec<(f32, usize)>> = HashMap::new();
for (j, c2) in ms2.iter().enumerate() {
if !ms2_keep[j] {
continue;
}
let g = match c2.window_group {
Some(g) => g,
None => continue,
};
let rt = ms2_rt_apex[j];
if !rt.is_finite() {
continue;
}
by_group_raw.entry(g).or_default().push((rt, j));
}
let mut by_group: HashMap<u32, FragmentGroupIndex> = HashMap::new();
for (g, mut v) in by_group_raw {
v.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
let (rt_apex, ms2_indices): (Vec<f32>, Vec<usize>) =
v.into_iter().map(|(rt, j)| (rt, j)).unzip();
by_group.insert(g, FragmentGroupIndex { ms2_indices, rt_apex });
}
let id_to_idx: HashMap<u64, usize> = ms2_cluster_ids
.iter()
.enumerate()
.map(|(i, &cid)| (cid, i))
.collect();
FragmentIndex {
dia_index,
ms2_storage,
ms2_im_mu,
ms2_im_windows,
ms2_keep,
ms2_cluster_ids,
ms2_mz_mu,
by_group,
id_to_idx,
}
}
pub fn from_parquet_file(
dia_index: Arc<DiaIndex>,
parquet_path: impl AsRef<Path>,
opts: &CandidateOpts,
) -> io::Result<Self> {
let path_str = parquet_path
.as_ref()
.to_str()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "non-UTF8 path"))?;
let clusters = load_parquet(path_str)?;
Ok(Self::from_owned(dia_index, clusters, opts))
}
pub fn from_parquet_dir(
dia_index: Arc<DiaIndex>,
dir: impl AsRef<Path>,
opts: &CandidateOpts,
) -> io::Result<Self> {
let dir = dir.as_ref();
let mut all: Vec<ClusterResult1D> = Vec::new();
for entry in std::fs::read_dir(dir)? {
let entry = entry?;
if !entry.file_type()?.is_file() {
continue;
}
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) != Some("parquet") {
continue;
}
let path_str = path
.to_str()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "non-UTF8 path"))?;
let mut part = load_parquet(path_str)?;
all.append(&mut part);
}
Ok(Self::from_owned(dia_index, all, opts))
}
pub fn from_slice(
dia_index: Arc<DiaIndex>,
ms2: &[ClusterResult1D],
opts: &CandidateOpts,
) -> Self {
let storage: Arc<[ClusterResult1D]> = ms2.to_vec().into();
Self::build_with_storage(dia_index, storage, opts)
}
pub fn from_owned(
dia_index: Arc<DiaIndex>,
ms2: Vec<ClusterResult1D>,
opts: &CandidateOpts,
) -> Self {
let storage: Arc<[ClusterResult1D]> = ms2.into();
Self::build_with_storage(dia_index, storage, opts)
}
fn precursor_rt_apex_sec(&self, prec: &ClusterResult1D) -> Option<f32> {
let mu = prec.rt_fit.mu;
if mu.is_finite() && mu > 0.0 {
return Some(mu);
}
let ft = &self.dia_index.frame_time;
let mut sum = 0.0f64;
let mut n = 0usize;
for fid in &prec.frame_ids_used {
if let Some(&t) = ft.get(fid) {
if t.is_finite() {
sum += t;
n += 1;
}
}
}
if n > 0 {
return Some((sum / n as f64) as f32);
}
let (rt_lo, rt_hi) = prec.rt_window;
if rt_hi >= rt_lo {
let mut sum = 0.0f64;
let mut n = 0usize;
for fid in rt_lo as u32..=rt_hi as u32 {
if let Some(&t) = ft.get(&fid) {
if t.is_finite() {
sum += t;
n += 1;
}
}
}
if n > 0 {
return Some((sum / n as f64) as f32);
}
}
None
}
pub fn query_precursor(
&self,
prec: &ClusterResult1D,
groups: Option<&[u32]>,
opts: &FragmentQueryOpts,
) -> Vec<u64> {
let mut out = Vec::new();
let prec_mz = if let Some(m) = cluster_mz_mu(prec) {
if m.is_finite() && m > 0.0 {
m
} else {
match prec.mz_window {
Some((lo, hi)) if lo.is_finite() && hi.is_finite() && hi > lo => {
0.5 * (lo + hi)
}
_ => return out, }
}
} else {
match prec.mz_window {
Some((lo, hi)) if lo.is_finite() && hi.is_finite() && hi > lo => {
0.5 * (lo + hi)
}
_ => return out,
}
};
let prec_rt = match self.precursor_rt_apex_sec(prec) {
Some(t) => t,
None => return out,
};
let prec_im = if prec.im_fit.mu.is_finite() {
prec.im_fit.mu
} else {
let (lo, hi) = prec.im_window;
if hi > lo {
((lo + hi) as f32) * 0.5
} else {
return out;
}
};
let prec_im_win = prec.im_window;
let prec_scan_win = (prec_im_win.0 as u32, prec_im_win.1 as u32);
let groups: Vec<u32> = match groups {
Some(gs) if !gs.is_empty() => gs.to_vec(),
_ => self.dia_index.groups_for_precursor(prec_mz, prec_im),
};
if groups.is_empty() {
return Vec::new();
}
let require_tile = opts.require_tile_compat;
let reject_inside = opts.reject_frag_inside_precursor_tile;
for g in groups {
let fg = match self.by_group.get(&g) {
Some(fg) => fg,
None => continue,
};
let (lo_idx, hi_idx) = if let Some(dt) = opts.max_rt_apex_delta_sec {
if dt > 0.0 {
let lo_t = prec_rt - dt;
let hi_t = prec_rt + dt;
(
lower_bound(&fg.rt_apex, lo_t),
upper_bound(&fg.rt_apex, hi_t),
)
} else {
(0, fg.ms2_indices.len())
}
} else {
(0, fg.ms2_indices.len())
};
let slices = self.dia_index.program_slices_for_group(g);
'ms2_loop: for k in lo_idx..hi_idx {
let j = fg.ms2_indices[k];
if !self.ms2_keep[j] {
continue;
}
let im2 = self.ms2_im_windows[j];
let frag_scan_win = (im2.0 as u32, im2.1 as u32);
let im_overlap = {
let lo = prec_im_win.0.max(im2.0);
let hi = prec_im_win.1.min(im2.1);
hi.saturating_sub(lo).saturating_add(1)
};
if im_overlap < opts.min_im_overlap_scans {
continue;
}
if let Some(max_d) = opts.max_scan_apex_delta {
let s2 = self.ms2_im_mu[j];
if s2.is_finite() {
let d = (prec_im - s2).abs() as f32;
if d > max_d as f32 {
continue 'ms2_loop;
}
} else {
continue 'ms2_loop;
}
}
if require_tile || reject_inside {
let mut tile_compatible = false;
let mut inside_prec_tile = false;
let frag_mz = self.ms2_mz_mu[j];
for s in &slices {
let tile_scans = (s.scan_lo, s.scan_hi);
let prec_hits_tile = ranges_overlap_u32(prec_scan_win, tile_scans);
let frag_hits_tile = ranges_overlap_u32(frag_scan_win, tile_scans);
if !(prec_hits_tile && frag_hits_tile) {
continue;
}
tile_compatible = true;
if reject_inside && frag_mz.is_finite() {
let prec_in_tile =
prec_mz >= s.mz_lo as f32 && prec_mz <= s.mz_hi as f32;
let frag_in_tile =
frag_mz >= s.mz_lo as f32 && frag_mz <= s.mz_hi as f32;
if prec_in_tile && frag_in_tile {
inside_prec_tile = true;
break;
}
}
}
if require_tile && !tile_compatible {
continue 'ms2_loop;
}
if reject_inside && inside_prec_tile {
continue 'ms2_loop;
}
}
out.push(self.ms2_cluster_ids[j]);
}
}
out.sort_unstable();
out.dedup();
out
}
pub fn query_precursors_par(
&self,
precursors: &[ClusterResult1D],
opts: &FragmentQueryOpts,
num_threads: usize,
) -> Vec<Vec<u64>> {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.build()
.unwrap();
pool.install(|| {
precursors
.par_iter()
.map(|prec| self.query_precursor(prec, None, opts))
.collect()
})
}
pub fn enumerate_candidates_for_precursor_thin(
&self,
prec: &ThinPrecursor,
window_groups: Option<&[u32]>,
opts: &FragmentQueryOpts,
) -> Vec<usize> {
let mut out = Vec::<usize>::new();
let prec_mz = prec.mz_mu;
let prec_rt = prec.rt_mu;
let prec_im = prec.im_mu;
let prec_im_win = prec.im_window;
let prec_scan_win = (prec_im_win.0 as u32, prec_im_win.1 as u32);
let groups: Vec<u32> = match window_groups {
Some(gs) if !gs.is_empty() => gs.to_vec(),
_ => self.dia_index.groups_for_precursor(prec_mz, prec_im),
};
if groups.is_empty() {
return out;
}
let require_tile = opts.require_tile_compat;
let reject_inside = opts.reject_frag_inside_precursor_tile;
for g in groups {
let fg = match self.by_group.get(&g) {
Some(fg) => fg,
None => continue,
};
let (lo_idx, hi_idx) = if let Some(dt) = opts.max_rt_apex_delta_sec {
if dt > 0.0 {
let lo_t = prec_rt - dt;
let hi_t = prec_rt + dt;
(
lower_bound(&fg.rt_apex, lo_t),
upper_bound(&fg.rt_apex, hi_t),
)
} else {
(0, fg.ms2_indices.len())
}
} else {
(0, fg.ms2_indices.len())
};
let slices = self.dia_index.program_slices_for_group(g);
'ms2_loop: for k in lo_idx..hi_idx {
let j = fg.ms2_indices[k];
if !self.ms2_keep[j] {
continue;
}
let im2 = self.ms2_im_windows[j];
let frag_scan_win = (im2.0 as u32, im2.1 as u32);
let im_overlap = {
let lo = prec_im_win.0.max(im2.0);
let hi = prec_im_win.1.min(im2.1);
hi.saturating_sub(lo).saturating_add(1)
};
if im_overlap < opts.min_im_overlap_scans {
continue;
}
if let Some(max_d) = opts.max_scan_apex_delta {
let s2 = self.ms2_im_mu[j];
if s2.is_finite() {
let d = (prec_im - s2).abs() as f32;
if d > max_d as f32 {
continue 'ms2_loop;
}
} else {
continue 'ms2_loop;
}
}
if require_tile || reject_inside {
let mut tile_compatible = false;
let mut inside_prec_tile = false;
let frag_mz = self.ms2_mz_mu[j];
for s in &slices {
let tile_scans = (s.scan_lo, s.scan_hi);
let prec_hits_tile = ranges_overlap_u32(prec_scan_win, tile_scans);
let frag_hits_tile = ranges_overlap_u32(frag_scan_win, tile_scans);
if !(prec_hits_tile && frag_hits_tile) {
continue;
}
tile_compatible = true;
if reject_inside && frag_mz.is_finite() {
let prec_in_tile =
prec_mz >= s.mz_lo as f32 && prec_mz <= s.mz_hi as f32;
let frag_in_tile =
frag_mz >= s.mz_lo as f32 && frag_mz <= s.mz_hi as f32;
if prec_in_tile && frag_in_tile {
inside_prec_tile = true;
break;
}
}
}
if require_tile && !tile_compatible {
continue 'ms2_loop;
}
if reject_inside && inside_prec_tile {
continue 'ms2_loop;
}
}
out.push(j);
}
}
out.sort_unstable();
out.dedup();
out
}
fn enumerate_candidates_for_precursors_thin(
&self,
precs: &[Option<ThinPrecursor>],
opts: &FragmentQueryOpts,
) -> Vec<Vec<usize>> {
precs
.iter()
.map(|maybe_prec| {
if let Some(ref p) = maybe_prec {
self.enumerate_candidates_for_precursor_thin(p, None, opts)
} else {
Vec::new()
}
})
.collect()
}
pub fn enumerate_candidates_for_feature(
&self,
feat: &SimpleFeature,
opts: &FragmentQueryOpts,
) -> Vec<usize> {
let prec_cluster = match feature_representative_cluster(feat) {
Some(c) => c,
None => return Vec::new(),
};
let thin = match self.make_thin_precursor(prec_cluster) {
Some(t) => t,
None => return Vec::new(),
};
self.enumerate_candidates_for_precursor_thin(&thin, None, opts)
}
pub fn query_precursor_scored(
&self,
prec: &ClusterResult1D,
window_groups: Option<&[u32]>,
opts: &FragmentQueryOpts,
mode: MatchScoreMode,
geom_opts: &ScoreOpts,
xic_opts: &XicScoreOpts,
min_score: f32,
) -> Vec<ScoredHit> {
let thin = match self.make_thin_precursor(prec) {
Some(t) => t,
None => return Vec::new(),
};
let candidate_ids =
self.enumerate_candidates_for_precursor_thin(&thin, window_groups, opts);
query_precursor_scored(
PrecursorLike::Cluster(prec),
self.ms2_slice(),
&candidate_ids,
mode,
geom_opts,
xic_opts,
min_score,
)
}
pub fn query_precursors_scored_par(
&self,
precs: &[ClusterResult1D],
opts: &FragmentQueryOpts,
mode: MatchScoreMode,
geom_opts: &ScoreOpts,
xic_opts: &XicScoreOpts,
min_score: f32,
) -> Vec<Vec<ScoredHit>> {
let thin_precs: Vec<Option<ThinPrecursor>> = precs
.iter()
.map(|c| self.make_thin_precursor(c))
.collect();
let all_candidates = self.enumerate_candidates_for_precursors_thin(&thin_precs, opts);
let prec_like: Vec<PrecursorLike<'_>> =
precs.iter().map(|c| PrecursorLike::Cluster(c)).collect();
query_precursors_scored_par(
&prec_like,
self.ms2_slice(),
&all_candidates,
mode,
geom_opts,
xic_opts,
min_score,
)
}
fn enumerate_candidates_for_features(
&self,
feats: &[SimpleFeature],
opts: &FragmentQueryOpts,
) -> Vec<Vec<usize>> {
feats
.iter()
.map(|feat| {
let prec_cluster = match feature_representative_cluster(feat) {
Some(c) => c,
None => return Vec::new(),
};
let thin = match self.make_thin_precursor(prec_cluster) {
Some(t) => t,
None => return Vec::new(),
};
self.enumerate_candidates_for_precursor_thin(&thin, None, opts)
})
.collect()
}
pub fn query_feature_scored(
&self,
feat: &SimpleFeature,
opts: &FragmentQueryOpts,
mode: MatchScoreMode,
geom_opts: &ScoreOpts,
xic_opts: &XicScoreOpts,
min_score: f32,
) -> Vec<ScoredHit> {
let candidate_ids = self.enumerate_candidates_for_feature(feat, opts);
query_precursor_scored(
PrecursorLike::Feature(feat),
self.ms2_slice(),
&candidate_ids,
mode,
geom_opts,
xic_opts,
min_score,
)
}
pub fn query_features_scored_par(
&self,
feats: &[SimpleFeature],
opts: &FragmentQueryOpts,
mode: MatchScoreMode,
geom_opts: &ScoreOpts,
xic_opts: &XicScoreOpts,
min_score: f32,
) -> Vec<Vec<ScoredHit>> {
let all_candidates = self.enumerate_candidates_for_features(feats, opts);
let prec_like: Vec<PrecursorLike<'_>> =
feats.iter().map(|f| PrecursorLike::Feature(f)).collect();
query_precursors_scored_par(
&prec_like,
self.ms2_slice(),
&all_candidates,
mode,
geom_opts,
xic_opts,
min_score,
)
}
pub fn make_thin_precursor(&self, prec: &ClusterResult1D) -> Option<ThinPrecursor> {
let prec_mz = if let Some(m) = cluster_mz_mu(prec) {
if m.is_finite() && m > 0.0 {
m
} else {
match prec.mz_window {
Some((lo, hi)) if lo.is_finite() && hi.is_finite() && hi > lo => {
0.5 * (lo + hi)
}
_ => return None, }
}
} else {
match prec.mz_window {
Some((lo, hi)) if lo.is_finite() && hi.is_finite() && hi > lo => {
0.5 * (lo + hi)
}
_ => return None,
}
};
let rt_mu = match self.precursor_rt_apex_sec(prec) {
Some(t) => t,
None => return None,
};
let im_window = prec.im_window;
let im_mu = if prec.im_fit.mu.is_finite() {
prec.im_fit.mu
} else {
let (lo, hi) = im_window;
if hi > lo {
((lo + hi) as f32) * 0.5
} else {
return None;
}
};
Some(ThinPrecursor {
mz_mu: prec_mz,
rt_mu,
im_mu,
im_window,
})
}
}
#[derive(Clone, Debug)]
pub struct ThinPrecursor {
pub mz_mu: f32,
pub rt_mu: f32,
pub im_mu: f32,
pub im_window: (usize, usize),
}
fn lower_bound(xs: &[f32], x: f32) -> usize {
let mut lo = 0;
let mut hi = xs.len();
while lo < hi {
let mid = (lo + hi) / 2;
if xs[mid] < x {
lo = mid + 1;
} else {
hi = mid;
}
}
lo
}
#[inline]
fn ranges_overlap_u32(a: (u32, u32), b: (u32, u32)) -> bool {
let lo = a.0.max(b.0);
let hi = a.1.min(b.1);
hi >= lo
}
fn upper_bound(xs: &[f32], x: f32) -> usize {
let mut lo = 0;
let mut hi = xs.len();
while lo < hi {
let mid = (lo + hi) / 2;
if xs[mid] <= x {
lo = mid + 1;
} else {
hi = mid;
}
}
lo
}
pub fn fragment_from_cluster(c: &ClusterResult1D) -> Option<PseudoFragment> {
let mz = if let Some(fit) = &c.mz_fit {
if fit.mu.is_finite() && fit.mu > 0.0 {
fit.mu
} else if let Some((lo, hi)) = c.mz_window {
0.5 * (lo + hi)
} else {
return None;
}
} else if let Some((lo, hi)) = c.mz_window {
0.5 * (lo + hi)
} else {
return None;
};
if !mz.is_finite() {
return None;
}
let intensity = c.raw_sum;
Some(PseudoFragment {
mz,
intensity,
ms2_cluster_index: 0,
ms2_cluster_id: c.cluster_id,
window_group: c.window_group.unwrap_or(0),
})
}
fn feature_representative_cluster<'a>(feat: &'a SimpleFeature) -> Option<&'a ClusterResult1D> {
if feat.member_clusters.is_empty() {
return None;
}
feat.member_clusters
.iter()
.max_by(|a, b| a.raw_sum.partial_cmp(&b.raw_sum).unwrap_or(std::cmp::Ordering::Equal))
}