Skip to main content

openproteo_core/
arrow.rs

1//! Apache Arrow bridge for [`crate::SpectrumRecord`].
2//!
3//! This module is gated behind the `arrow` Cargo feature. It exposes:
4//!
5//! * [`spectrum_record_schema`]: the canonical Arrow [`Schema`] for a
6//!   stream of `SpectrumRecord`s.
7//! * [`SpectrumBatchBuilder`]: a streaming builder that accumulates rows
8//!   and produces a [`RecordBatch`] when finalized.
9//!
10//! The schema is intentionally flat (one row per spectrum). Peak arrays
11//! are stored as `LargeList<Float64>` / `LargeList<Float32>` so that
12//! individual spectra can exceed `i32::MAX` peaks without truncation.
13//! Precursor fields are inlined as nullable scalar columns; an MS1
14//! spectrum has all precursor columns null. This shape is friendly to
15//! Polars / DataFusion / DuckDB consumers.
16//!
17//! The Arrow schema is part of this crate's stable surface for the
18//! purposes of consumers that pin `openproteo-core` directly; any column
19//! addition is a minor-version bump, any removal or rename is breaking.
20
21use std::sync::Arc;
22
23use arrow_array::builder::{
24    ArrayBuilder, Float32Builder, Float64Builder, Int32Builder, LargeListBuilder, StringBuilder,
25    UInt32Builder, UInt8Builder,
26};
27use arrow_array::{ArrayRef, RecordBatch};
28use arrow_schema::{DataType, Field, Schema, SchemaRef};
29
30use crate::{Activation, Analyzer, MobilityArrayKind, Polarity, ScanMode, SpectrumRecord};
31
32/// Return the canonical [`Schema`] for a `RecordBatch` of spectra.
33pub fn spectrum_record_schema() -> SchemaRef {
34    let mz_item = Arc::new(Field::new("item", DataType::Float64, false));
35    let int_item = Arc::new(Field::new("item", DataType::Float32, false));
36    let mob_item = Arc::new(Field::new("item", DataType::Float32, false));
37    Arc::new(Schema::new(vec![
38        Field::new("index", DataType::UInt32, false),
39        Field::new("scan_number", DataType::UInt32, false),
40        Field::new("native_id", DataType::Utf8, false),
41        Field::new("ms_level", DataType::UInt8, false),
42        Field::new("polarity", DataType::Utf8, true),
43        Field::new("scan_mode", DataType::Utf8, true),
44        Field::new("analyzer", DataType::Utf8, true),
45        Field::new("filter", DataType::Utf8, true),
46        Field::new("retention_time_sec", DataType::Float64, false),
47        Field::new("total_ion_current", DataType::Float64, true),
48        Field::new("base_peak_mz", DataType::Float64, true),
49        Field::new("base_peak_intensity", DataType::Float64, true),
50        Field::new("low_mz", DataType::Float64, true),
51        Field::new("high_mz", DataType::Float64, true),
52        Field::new("ion_injection_time_ms", DataType::Float64, true),
53        Field::new("inv_mobility", DataType::Float64, true),
54        Field::new("precursor_target_mz", DataType::Float64, true),
55        Field::new("precursor_selected_mz", DataType::Float64, true),
56        Field::new("precursor_isolation_width", DataType::Float64, true),
57        Field::new("precursor_charge", DataType::Int32, true),
58        Field::new("precursor_intensity", DataType::Float64, true),
59        Field::new("precursor_collision_energy", DataType::Float64, true),
60        Field::new("precursor_ce_is_nce", DataType::UInt8, true),
61        Field::new("precursor_native_id", DataType::Utf8, true),
62        Field::new("precursor_activation", DataType::Utf8, true),
63        Field::new("precursor_analyzer", DataType::Utf8, true),
64        Field::new_large_list("mz", mz_item, false),
65        Field::new_large_list("intensity", int_item, false),
66        Field::new_large_list("inv_mobility_per_peak", mob_item, true),
67        Field::new("mobility_array_kind", DataType::Utf8, true),
68    ]))
69}
70
71fn polarity_str(p: Polarity) -> &'static str {
72    match p {
73        Polarity::Positive => "positive",
74        Polarity::Negative => "negative",
75    }
76}
77
78fn scan_mode_str(m: ScanMode) -> &'static str {
79    match m {
80        ScanMode::Profile => "profile",
81        ScanMode::Centroid => "centroid",
82    }
83}
84
85fn analyzer_str(a: Analyzer) -> &'static str {
86    match a {
87        Analyzer::ITMS => "itms",
88        Analyzer::TQMS => "tqms",
89        Analyzer::SQMS => "sqms",
90        Analyzer::TOFMS => "tof",
91        Analyzer::FTMS => "ftms",
92        Analyzer::Sector => "sector",
93    }
94}
95
96fn activation_str(a: Activation) -> &'static str {
97    match a {
98        Activation::CID => "cid",
99        Activation::HCD => "hcd",
100        Activation::ETD => "etd",
101        Activation::ECD => "ecd",
102        Activation::UVPD => "uvpd",
103        Activation::PQD => "pqd",
104        Activation::PD => "pd",
105        Activation::SID => "sid",
106        Activation::EThcD => "ethcd",
107        Activation::IRMPD => "irmpd",
108        Activation::MPID => "mpid",
109    }
110}
111
112fn mobility_kind_str(k: MobilityArrayKind) -> &'static str {
113    match k {
114        MobilityArrayKind::InverseReducedVsPerCm2 => "inverse_reduced_k0",
115        MobilityArrayKind::DriftTimeMilliseconds => "drift_time_ms",
116    }
117}
118
119/// Streaming builder that accumulates `SpectrumRecord`s and produces a
120/// single Arrow [`RecordBatch`] when finalized.
121///
122/// All rows in a batch share one `mobility_array_kind` value, recorded
123/// once at construction. Push rows with [`push`](Self::push); call
124/// [`finish`](Self::finish) to materialize the batch.
125pub struct SpectrumBatchBuilder {
126    schema: SchemaRef,
127    mobility_kind: Option<MobilityArrayKind>,
128    index: UInt32Builder,
129    scan_number: UInt32Builder,
130    native_id: StringBuilder,
131    ms_level: UInt8Builder,
132    polarity: StringBuilder,
133    scan_mode: StringBuilder,
134    analyzer: StringBuilder,
135    filter: StringBuilder,
136    retention_time_sec: Float64Builder,
137    total_ion_current: Float64Builder,
138    base_peak_mz: Float64Builder,
139    base_peak_intensity: Float64Builder,
140    low_mz: Float64Builder,
141    high_mz: Float64Builder,
142    ion_injection_time_ms: Float64Builder,
143    inv_mobility: Float64Builder,
144    precursor_target_mz: Float64Builder,
145    precursor_selected_mz: Float64Builder,
146    precursor_isolation_width: Float64Builder,
147    precursor_charge: Int32Builder,
148    precursor_intensity: Float64Builder,
149    precursor_collision_energy: Float64Builder,
150    precursor_ce_is_nce: UInt8Builder,
151    precursor_native_id: StringBuilder,
152    precursor_activation: StringBuilder,
153    precursor_analyzer: StringBuilder,
154    mz: LargeListBuilder<Float64Builder>,
155    intensity: LargeListBuilder<Float32Builder>,
156    inv_mobility_per_peak: LargeListBuilder<Float32Builder>,
157    mobility_array_kind_col: StringBuilder,
158}
159
160impl SpectrumBatchBuilder {
161    /// Create a new builder. Pass the `mobility_array_kind` from the
162    /// source's [`crate::RunMetadata`] so the resulting Arrow batch
163    /// carries the unit/CV interpretation alongside the data.
164    pub fn new(mobility_kind: Option<MobilityArrayKind>) -> Self {
165        Self {
166            schema: spectrum_record_schema(),
167            mobility_kind,
168            index: UInt32Builder::new(),
169            scan_number: UInt32Builder::new(),
170            native_id: StringBuilder::new(),
171            ms_level: UInt8Builder::new(),
172            polarity: StringBuilder::new(),
173            scan_mode: StringBuilder::new(),
174            analyzer: StringBuilder::new(),
175            filter: StringBuilder::new(),
176            retention_time_sec: Float64Builder::new(),
177            total_ion_current: Float64Builder::new(),
178            base_peak_mz: Float64Builder::new(),
179            base_peak_intensity: Float64Builder::new(),
180            low_mz: Float64Builder::new(),
181            high_mz: Float64Builder::new(),
182            ion_injection_time_ms: Float64Builder::new(),
183            inv_mobility: Float64Builder::new(),
184            precursor_target_mz: Float64Builder::new(),
185            precursor_selected_mz: Float64Builder::new(),
186            precursor_isolation_width: Float64Builder::new(),
187            precursor_charge: Int32Builder::new(),
188            precursor_intensity: Float64Builder::new(),
189            precursor_collision_energy: Float64Builder::new(),
190            precursor_ce_is_nce: UInt8Builder::new(),
191            precursor_native_id: StringBuilder::new(),
192            precursor_activation: StringBuilder::new(),
193            precursor_analyzer: StringBuilder::new(),
194            mz: LargeListBuilder::new(Float64Builder::new()).with_field(Arc::new(Field::new(
195                "item",
196                DataType::Float64,
197                false,
198            ))),
199            intensity: LargeListBuilder::new(Float32Builder::new())
200                .with_field(Arc::new(Field::new("item", DataType::Float32, false))),
201            inv_mobility_per_peak: LargeListBuilder::new(Float32Builder::new())
202                .with_field(Arc::new(Field::new("item", DataType::Float32, false))),
203            mobility_array_kind_col: StringBuilder::new(),
204        }
205    }
206
207    /// Schema for the batch produced by this builder.
208    pub fn schema(&self) -> SchemaRef {
209        self.schema.clone()
210    }
211
212    /// Append one spectrum row.
213    pub fn push(&mut self, rec: &SpectrumRecord) {
214        self.index.append_value(rec.index as u32);
215        self.scan_number.append_value(rec.scan_number);
216        self.native_id.append_value(&rec.native_id);
217        self.ms_level.append_value(rec.ms_level as u8);
218        self.polarity.append_option(rec.polarity.map(polarity_str));
219        self.scan_mode
220            .append_option(rec.scan_mode.map(scan_mode_str));
221        self.analyzer.append_option(rec.analyzer.map(analyzer_str));
222        self.filter.append_option(rec.filter.as_deref());
223        self.retention_time_sec.append_value(rec.retention_time_sec);
224        self.total_ion_current.append_option(rec.total_ion_current);
225        self.base_peak_mz.append_option(rec.base_peak_mz);
226        self.base_peak_intensity
227            .append_option(rec.base_peak_intensity);
228        self.low_mz.append_option(rec.low_mz);
229        self.high_mz.append_option(rec.high_mz);
230        self.ion_injection_time_ms
231            .append_option(rec.ion_injection_time_ms);
232        self.inv_mobility.append_option(rec.inv_mobility);
233
234        match &rec.precursor {
235            Some(p) => {
236                self.precursor_target_mz.append_option(p.target_mz);
237                self.precursor_selected_mz.append_option(p.selected_mz);
238                self.precursor_isolation_width
239                    .append_option(p.isolation_width);
240                self.precursor_charge.append_option(p.charge);
241                self.precursor_intensity.append_option(p.intensity);
242                self.precursor_collision_energy
243                    .append_option(p.collision_energy);
244                self.precursor_ce_is_nce.append_value(u8::from(p.ce_is_nce));
245                self.precursor_native_id
246                    .append_option(p.precursor_native_id.as_deref());
247                self.precursor_activation
248                    .append_option(p.activation.map(activation_str));
249                self.precursor_analyzer
250                    .append_option(p.analyzer.map(analyzer_str));
251            }
252            None => {
253                self.precursor_target_mz.append_null();
254                self.precursor_selected_mz.append_null();
255                self.precursor_isolation_width.append_null();
256                self.precursor_charge.append_null();
257                self.precursor_intensity.append_null();
258                self.precursor_collision_energy.append_null();
259                self.precursor_ce_is_nce.append_null();
260                self.precursor_native_id.append_null();
261                self.precursor_activation.append_null();
262                self.precursor_analyzer.append_null();
263            }
264        }
265
266        for &v in &rec.mz {
267            self.mz.values().append_value(v);
268        }
269        self.mz.append(true);
270        for &v in &rec.intensity {
271            self.intensity.values().append_value(v);
272        }
273        self.intensity.append(true);
274        match &rec.inv_mobility_per_peak {
275            Some(mob) => {
276                for &v in mob {
277                    self.inv_mobility_per_peak.values().append_value(v);
278                }
279                self.inv_mobility_per_peak.append(true);
280            }
281            None => self.inv_mobility_per_peak.append(false),
282        }
283        self.mobility_array_kind_col
284            .append_option(self.mobility_kind.map(mobility_kind_str));
285    }
286
287    /// Number of rows accumulated so far.
288    pub fn len(&self) -> usize {
289        self.index.len()
290    }
291
292    /// `true` if no rows have been pushed.
293    pub fn is_empty(&self) -> bool {
294        self.len() == 0
295    }
296
297    /// Materialize the accumulated rows into a [`RecordBatch`].
298    pub fn finish(mut self) -> Result<RecordBatch, arrow_schema::ArrowError> {
299        let arrays: Vec<ArrayRef> = vec![
300            Arc::new(self.index.finish()),
301            Arc::new(self.scan_number.finish()),
302            Arc::new(self.native_id.finish()),
303            Arc::new(self.ms_level.finish()),
304            Arc::new(self.polarity.finish()),
305            Arc::new(self.scan_mode.finish()),
306            Arc::new(self.analyzer.finish()),
307            Arc::new(self.filter.finish()),
308            Arc::new(self.retention_time_sec.finish()),
309            Arc::new(self.total_ion_current.finish()),
310            Arc::new(self.base_peak_mz.finish()),
311            Arc::new(self.base_peak_intensity.finish()),
312            Arc::new(self.low_mz.finish()),
313            Arc::new(self.high_mz.finish()),
314            Arc::new(self.ion_injection_time_ms.finish()),
315            Arc::new(self.inv_mobility.finish()),
316            Arc::new(self.precursor_target_mz.finish()),
317            Arc::new(self.precursor_selected_mz.finish()),
318            Arc::new(self.precursor_isolation_width.finish()),
319            Arc::new(self.precursor_charge.finish()),
320            Arc::new(self.precursor_intensity.finish()),
321            Arc::new(self.precursor_collision_energy.finish()),
322            Arc::new(self.precursor_ce_is_nce.finish()),
323            Arc::new(self.precursor_native_id.finish()),
324            Arc::new(self.precursor_activation.finish()),
325            Arc::new(self.precursor_analyzer.finish()),
326            Arc::new(self.mz.finish()),
327            Arc::new(self.intensity.finish()),
328            Arc::new(self.inv_mobility_per_peak.finish()),
329            Arc::new(self.mobility_array_kind_col.finish()),
330        ];
331        RecordBatch::try_new(self.schema.clone(), arrays)
332    }
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338    use crate::{PrecursorInfo, SpectrumRecord};
339    use arrow_array::Array;
340
341    fn rec(index: usize, ms_level: u32, n_peaks: usize, with_mob: bool) -> SpectrumRecord {
342        let mz: Vec<f64> = (0..n_peaks).map(|i| 100.0 + i as f64).collect();
343        let intensity: Vec<f32> = (0..n_peaks).map(|i| 10.0 + i as f32).collect();
344        let mobility = if with_mob {
345            Some((0..n_peaks).map(|i| 0.5 + i as f32 * 0.01).collect())
346        } else {
347            None
348        };
349        SpectrumRecord {
350            index,
351            scan_number: (index + 1) as u32,
352            native_id: format!("scan={}", index + 1),
353            ms_level,
354            polarity: Some(Polarity::Positive),
355            scan_mode: Some(ScanMode::Centroid),
356            analyzer: Some(Analyzer::TOFMS),
357            filter: None,
358            retention_time_sec: index as f64,
359            total_ion_current: Some(intensity.iter().map(|&v| v as f64).sum()),
360            base_peak_mz: mz.last().copied(),
361            base_peak_intensity: intensity.last().map(|&v| v as f64),
362            low_mz: mz.first().copied(),
363            high_mz: mz.last().copied(),
364            ion_injection_time_ms: None,
365            inv_mobility: None,
366            precursor: if ms_level >= 2 {
367                Some(PrecursorInfo {
368                    target_mz: Some(500.0),
369                    selected_mz: Some(500.5),
370                    isolation_width: Some(2.0),
371                    charge: Some(2),
372                    ..Default::default()
373                })
374            } else {
375                None
376            },
377            mz,
378            intensity,
379            inv_mobility_per_peak: mobility,
380        }
381    }
382
383    #[test]
384    fn schema_round_trip() {
385        let mut b = SpectrumBatchBuilder::new(Some(MobilityArrayKind::DriftTimeMilliseconds));
386        b.push(&rec(0, 1, 3, true));
387        b.push(&rec(1, 2, 4, false));
388        let batch = b.finish().unwrap();
389        assert_eq!(batch.num_rows(), 2);
390        assert_eq!(batch.schema().fields().len(), 30);
391        let mz_col = batch
392            .column_by_name("mz")
393            .unwrap()
394            .as_any()
395            .downcast_ref::<arrow_array::LargeListArray>()
396            .unwrap();
397        assert_eq!(mz_col.value_length(0), 3);
398        assert_eq!(mz_col.value_length(1), 4);
399        let mob_col = batch
400            .column_by_name("inv_mobility_per_peak")
401            .unwrap()
402            .as_any()
403            .downcast_ref::<arrow_array::LargeListArray>()
404            .unwrap();
405        assert!(mob_col.is_valid(0));
406        assert!(mob_col.is_null(1));
407    }
408}