1use std::sync::Arc;
22
23use arrow_array::builder::{
24 ArrayBuilder, Float32Builder, Float64Builder, Int32Builder, LargeListBuilder, StringBuilder,
25 UInt32Builder, UInt8Builder,
26};
27use arrow_array::{ArrayRef, RecordBatch};
28use arrow_schema::{DataType, Field, Schema, SchemaRef};
29
30use crate::{Activation, Analyzer, MobilityArrayKind, Polarity, ScanMode, SpectrumRecord};
31
32pub fn spectrum_record_schema() -> SchemaRef {
34 let mz_item = Arc::new(Field::new("item", DataType::Float64, false));
35 let int_item = Arc::new(Field::new("item", DataType::Float32, false));
36 let mob_item = Arc::new(Field::new("item", DataType::Float32, false));
37 Arc::new(Schema::new(vec![
38 Field::new("index", DataType::UInt32, false),
39 Field::new("scan_number", DataType::UInt32, false),
40 Field::new("native_id", DataType::Utf8, false),
41 Field::new("ms_level", DataType::UInt8, false),
42 Field::new("polarity", DataType::Utf8, true),
43 Field::new("scan_mode", DataType::Utf8, true),
44 Field::new("analyzer", DataType::Utf8, true),
45 Field::new("filter", DataType::Utf8, true),
46 Field::new("retention_time_sec", DataType::Float64, false),
47 Field::new("total_ion_current", DataType::Float64, true),
48 Field::new("base_peak_mz", DataType::Float64, true),
49 Field::new("base_peak_intensity", DataType::Float64, true),
50 Field::new("low_mz", DataType::Float64, true),
51 Field::new("high_mz", DataType::Float64, true),
52 Field::new("ion_injection_time_ms", DataType::Float64, true),
53 Field::new("inv_mobility", DataType::Float64, true),
54 Field::new("precursor_target_mz", DataType::Float64, true),
55 Field::new("precursor_selected_mz", DataType::Float64, true),
56 Field::new("precursor_isolation_width", DataType::Float64, true),
57 Field::new("precursor_charge", DataType::Int32, true),
58 Field::new("precursor_intensity", DataType::Float64, true),
59 Field::new("precursor_collision_energy", DataType::Float64, true),
60 Field::new("precursor_ce_is_nce", DataType::UInt8, true),
61 Field::new("precursor_native_id", DataType::Utf8, true),
62 Field::new("precursor_activation", DataType::Utf8, true),
63 Field::new("precursor_analyzer", DataType::Utf8, true),
64 Field::new_large_list("mz", mz_item, false),
65 Field::new_large_list("intensity", int_item, false),
66 Field::new_large_list("inv_mobility_per_peak", mob_item, true),
67 Field::new("mobility_array_kind", DataType::Utf8, true),
68 ]))
69}
70
71fn polarity_str(p: Polarity) -> &'static str {
72 match p {
73 Polarity::Positive => "positive",
74 Polarity::Negative => "negative",
75 }
76}
77
78fn scan_mode_str(m: ScanMode) -> &'static str {
79 match m {
80 ScanMode::Profile => "profile",
81 ScanMode::Centroid => "centroid",
82 }
83}
84
85fn analyzer_str(a: Analyzer) -> &'static str {
86 match a {
87 Analyzer::ITMS => "itms",
88 Analyzer::TQMS => "tqms",
89 Analyzer::SQMS => "sqms",
90 Analyzer::TOFMS => "tof",
91 Analyzer::FTMS => "ftms",
92 Analyzer::Sector => "sector",
93 }
94}
95
96fn activation_str(a: Activation) -> &'static str {
97 match a {
98 Activation::CID => "cid",
99 Activation::HCD => "hcd",
100 Activation::ETD => "etd",
101 Activation::ECD => "ecd",
102 Activation::UVPD => "uvpd",
103 Activation::PQD => "pqd",
104 Activation::PD => "pd",
105 Activation::SID => "sid",
106 Activation::EThcD => "ethcd",
107 Activation::IRMPD => "irmpd",
108 Activation::MPID => "mpid",
109 }
110}
111
112fn mobility_kind_str(k: MobilityArrayKind) -> &'static str {
113 match k {
114 MobilityArrayKind::InverseReducedVsPerCm2 => "inverse_reduced_k0",
115 MobilityArrayKind::DriftTimeMilliseconds => "drift_time_ms",
116 }
117}
118
119pub struct SpectrumBatchBuilder {
126 schema: SchemaRef,
127 mobility_kind: Option<MobilityArrayKind>,
128 index: UInt32Builder,
129 scan_number: UInt32Builder,
130 native_id: StringBuilder,
131 ms_level: UInt8Builder,
132 polarity: StringBuilder,
133 scan_mode: StringBuilder,
134 analyzer: StringBuilder,
135 filter: StringBuilder,
136 retention_time_sec: Float64Builder,
137 total_ion_current: Float64Builder,
138 base_peak_mz: Float64Builder,
139 base_peak_intensity: Float64Builder,
140 low_mz: Float64Builder,
141 high_mz: Float64Builder,
142 ion_injection_time_ms: Float64Builder,
143 inv_mobility: Float64Builder,
144 precursor_target_mz: Float64Builder,
145 precursor_selected_mz: Float64Builder,
146 precursor_isolation_width: Float64Builder,
147 precursor_charge: Int32Builder,
148 precursor_intensity: Float64Builder,
149 precursor_collision_energy: Float64Builder,
150 precursor_ce_is_nce: UInt8Builder,
151 precursor_native_id: StringBuilder,
152 precursor_activation: StringBuilder,
153 precursor_analyzer: StringBuilder,
154 mz: LargeListBuilder<Float64Builder>,
155 intensity: LargeListBuilder<Float32Builder>,
156 inv_mobility_per_peak: LargeListBuilder<Float32Builder>,
157 mobility_array_kind_col: StringBuilder,
158}
159
160impl SpectrumBatchBuilder {
161 pub fn new(mobility_kind: Option<MobilityArrayKind>) -> Self {
165 Self {
166 schema: spectrum_record_schema(),
167 mobility_kind,
168 index: UInt32Builder::new(),
169 scan_number: UInt32Builder::new(),
170 native_id: StringBuilder::new(),
171 ms_level: UInt8Builder::new(),
172 polarity: StringBuilder::new(),
173 scan_mode: StringBuilder::new(),
174 analyzer: StringBuilder::new(),
175 filter: StringBuilder::new(),
176 retention_time_sec: Float64Builder::new(),
177 total_ion_current: Float64Builder::new(),
178 base_peak_mz: Float64Builder::new(),
179 base_peak_intensity: Float64Builder::new(),
180 low_mz: Float64Builder::new(),
181 high_mz: Float64Builder::new(),
182 ion_injection_time_ms: Float64Builder::new(),
183 inv_mobility: Float64Builder::new(),
184 precursor_target_mz: Float64Builder::new(),
185 precursor_selected_mz: Float64Builder::new(),
186 precursor_isolation_width: Float64Builder::new(),
187 precursor_charge: Int32Builder::new(),
188 precursor_intensity: Float64Builder::new(),
189 precursor_collision_energy: Float64Builder::new(),
190 precursor_ce_is_nce: UInt8Builder::new(),
191 precursor_native_id: StringBuilder::new(),
192 precursor_activation: StringBuilder::new(),
193 precursor_analyzer: StringBuilder::new(),
194 mz: LargeListBuilder::new(Float64Builder::new()).with_field(Arc::new(Field::new(
195 "item",
196 DataType::Float64,
197 false,
198 ))),
199 intensity: LargeListBuilder::new(Float32Builder::new())
200 .with_field(Arc::new(Field::new("item", DataType::Float32, false))),
201 inv_mobility_per_peak: LargeListBuilder::new(Float32Builder::new())
202 .with_field(Arc::new(Field::new("item", DataType::Float32, false))),
203 mobility_array_kind_col: StringBuilder::new(),
204 }
205 }
206
207 pub fn schema(&self) -> SchemaRef {
209 self.schema.clone()
210 }
211
212 pub fn push(&mut self, rec: &SpectrumRecord) {
214 self.index.append_value(rec.index as u32);
215 self.scan_number.append_value(rec.scan_number);
216 self.native_id.append_value(&rec.native_id);
217 self.ms_level.append_value(rec.ms_level as u8);
218 self.polarity.append_option(rec.polarity.map(polarity_str));
219 self.scan_mode
220 .append_option(rec.scan_mode.map(scan_mode_str));
221 self.analyzer.append_option(rec.analyzer.map(analyzer_str));
222 self.filter.append_option(rec.filter.as_deref());
223 self.retention_time_sec.append_value(rec.retention_time_sec);
224 self.total_ion_current.append_option(rec.total_ion_current);
225 self.base_peak_mz.append_option(rec.base_peak_mz);
226 self.base_peak_intensity
227 .append_option(rec.base_peak_intensity);
228 self.low_mz.append_option(rec.low_mz);
229 self.high_mz.append_option(rec.high_mz);
230 self.ion_injection_time_ms
231 .append_option(rec.ion_injection_time_ms);
232 self.inv_mobility.append_option(rec.inv_mobility);
233
234 match &rec.precursor {
235 Some(p) => {
236 self.precursor_target_mz.append_option(p.target_mz);
237 self.precursor_selected_mz.append_option(p.selected_mz);
238 self.precursor_isolation_width
239 .append_option(p.isolation_width);
240 self.precursor_charge.append_option(p.charge);
241 self.precursor_intensity.append_option(p.intensity);
242 self.precursor_collision_energy
243 .append_option(p.collision_energy);
244 self.precursor_ce_is_nce.append_value(u8::from(p.ce_is_nce));
245 self.precursor_native_id
246 .append_option(p.precursor_native_id.as_deref());
247 self.precursor_activation
248 .append_option(p.activation.map(activation_str));
249 self.precursor_analyzer
250 .append_option(p.analyzer.map(analyzer_str));
251 }
252 None => {
253 self.precursor_target_mz.append_null();
254 self.precursor_selected_mz.append_null();
255 self.precursor_isolation_width.append_null();
256 self.precursor_charge.append_null();
257 self.precursor_intensity.append_null();
258 self.precursor_collision_energy.append_null();
259 self.precursor_ce_is_nce.append_null();
260 self.precursor_native_id.append_null();
261 self.precursor_activation.append_null();
262 self.precursor_analyzer.append_null();
263 }
264 }
265
266 for &v in &rec.mz {
267 self.mz.values().append_value(v);
268 }
269 self.mz.append(true);
270 for &v in &rec.intensity {
271 self.intensity.values().append_value(v);
272 }
273 self.intensity.append(true);
274 match &rec.inv_mobility_per_peak {
275 Some(mob) => {
276 for &v in mob {
277 self.inv_mobility_per_peak.values().append_value(v);
278 }
279 self.inv_mobility_per_peak.append(true);
280 }
281 None => self.inv_mobility_per_peak.append(false),
282 }
283 self.mobility_array_kind_col
284 .append_option(self.mobility_kind.map(mobility_kind_str));
285 }
286
287 pub fn len(&self) -> usize {
289 self.index.len()
290 }
291
292 pub fn is_empty(&self) -> bool {
294 self.len() == 0
295 }
296
297 pub fn finish(mut self) -> Result<RecordBatch, arrow_schema::ArrowError> {
299 let arrays: Vec<ArrayRef> = vec![
300 Arc::new(self.index.finish()),
301 Arc::new(self.scan_number.finish()),
302 Arc::new(self.native_id.finish()),
303 Arc::new(self.ms_level.finish()),
304 Arc::new(self.polarity.finish()),
305 Arc::new(self.scan_mode.finish()),
306 Arc::new(self.analyzer.finish()),
307 Arc::new(self.filter.finish()),
308 Arc::new(self.retention_time_sec.finish()),
309 Arc::new(self.total_ion_current.finish()),
310 Arc::new(self.base_peak_mz.finish()),
311 Arc::new(self.base_peak_intensity.finish()),
312 Arc::new(self.low_mz.finish()),
313 Arc::new(self.high_mz.finish()),
314 Arc::new(self.ion_injection_time_ms.finish()),
315 Arc::new(self.inv_mobility.finish()),
316 Arc::new(self.precursor_target_mz.finish()),
317 Arc::new(self.precursor_selected_mz.finish()),
318 Arc::new(self.precursor_isolation_width.finish()),
319 Arc::new(self.precursor_charge.finish()),
320 Arc::new(self.precursor_intensity.finish()),
321 Arc::new(self.precursor_collision_energy.finish()),
322 Arc::new(self.precursor_ce_is_nce.finish()),
323 Arc::new(self.precursor_native_id.finish()),
324 Arc::new(self.precursor_activation.finish()),
325 Arc::new(self.precursor_analyzer.finish()),
326 Arc::new(self.mz.finish()),
327 Arc::new(self.intensity.finish()),
328 Arc::new(self.inv_mobility_per_peak.finish()),
329 Arc::new(self.mobility_array_kind_col.finish()),
330 ];
331 RecordBatch::try_new(self.schema.clone(), arrays)
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338 use crate::{PrecursorInfo, SpectrumRecord};
339 use arrow_array::Array;
340
341 fn rec(index: usize, ms_level: u32, n_peaks: usize, with_mob: bool) -> SpectrumRecord {
342 let mz: Vec<f64> = (0..n_peaks).map(|i| 100.0 + i as f64).collect();
343 let intensity: Vec<f32> = (0..n_peaks).map(|i| 10.0 + i as f32).collect();
344 let mobility = if with_mob {
345 Some((0..n_peaks).map(|i| 0.5 + i as f32 * 0.01).collect())
346 } else {
347 None
348 };
349 SpectrumRecord {
350 index,
351 scan_number: (index + 1) as u32,
352 native_id: format!("scan={}", index + 1),
353 ms_level,
354 polarity: Some(Polarity::Positive),
355 scan_mode: Some(ScanMode::Centroid),
356 analyzer: Some(Analyzer::TOFMS),
357 filter: None,
358 retention_time_sec: index as f64,
359 total_ion_current: Some(intensity.iter().map(|&v| v as f64).sum()),
360 base_peak_mz: mz.last().copied(),
361 base_peak_intensity: intensity.last().map(|&v| v as f64),
362 low_mz: mz.first().copied(),
363 high_mz: mz.last().copied(),
364 ion_injection_time_ms: None,
365 inv_mobility: None,
366 precursor: if ms_level >= 2 {
367 Some(PrecursorInfo {
368 target_mz: Some(500.0),
369 selected_mz: Some(500.5),
370 isolation_width: Some(2.0),
371 charge: Some(2),
372 ..Default::default()
373 })
374 } else {
375 None
376 },
377 mz,
378 intensity,
379 inv_mobility_per_peak: mobility,
380 }
381 }
382
383 #[test]
384 fn schema_round_trip() {
385 let mut b = SpectrumBatchBuilder::new(Some(MobilityArrayKind::DriftTimeMilliseconds));
386 b.push(&rec(0, 1, 3, true));
387 b.push(&rec(1, 2, 4, false));
388 let batch = b.finish().unwrap();
389 assert_eq!(batch.num_rows(), 2);
390 assert_eq!(batch.schema().fields().len(), 30);
391 let mz_col = batch
392 .column_by_name("mz")
393 .unwrap()
394 .as_any()
395 .downcast_ref::<arrow_array::LargeListArray>()
396 .unwrap();
397 assert_eq!(mz_col.value_length(0), 3);
398 assert_eq!(mz_col.value_length(1), 4);
399 let mob_col = batch
400 .column_by_name("inv_mobility_per_peak")
401 .unwrap()
402 .as_any()
403 .downcast_ref::<arrow_array::LargeListArray>()
404 .unwrap();
405 assert!(mob_col.is_valid(0));
406 assert!(mob_col.is_null(1));
407 }
408}