mzdata 0.60.3

A library to read mass spectrometry data formats and a data model for mass spectra
Documentation
use std::any::Any;
use std::fs::File;
use std::io;
use std::path::PathBuf;
use std::sync::{mpsc::sync_channel, Arc, atomic::{AtomicU64, Ordering as AtomicOrdering}};
use std::thread;
use std::time;

use clap::Parser;

use log::info;
use mzdata::io::MassSpectrometryFormat;
use mzdata::io::{checksum_file, MassSpectrometryReadWriteProcess, Sink, Source};
use mzdata::meta::custom_software_name;
use mzdata::meta::Software;
use mzdata::meta::{DataProcessing, ProcessingMethod, SourceFile};
use mzdata::params::ControlledVocabulary;
use mzdata::prelude::*;

use mzdata::spectrum::bindata::BinaryCompressionType;
use mzdata::spectrum::ArrayType;
use mzdata::spectrum::ArrayType::IntensityArray;
use mzdata::spectrum::ArrayType::MZArray;
use mzdata::spectrum::BinaryDataArrayType;
use mzdata::MzMLWriter;
use mzpeaks::{CentroidPeak, DeconvolutedPeak};

fn compression_parser(compression: &str) -> Result<BinaryCompressionType, String> {
    let compression = if !compression.ends_with(" compression") {
        format!("{compression} compression")
    } else {
        compression.to_string()
    };

    BinaryCompressionType::COMPRESSION_METHODS.iter().find(|x| {
        x.as_param().unwrap().name() == compression
    }).copied().ok_or_else(|| compression.to_string())
}

#[derive(Debug, Clone, Parser)]
pub struct MZConvert {
    #[arg()]
    pub inpath: String,
    #[arg()]
    pub outpath: String,

    #[arg(short='b', long, default_value_t=8192)]
    pub buffer_size: usize,

    #[arg(long, value_parser=compression_parser, default_value="zlib compression")]
    pub mz_compression: BinaryCompressionType,

    #[arg(long, value_parser=compression_parser, default_value="zlib compression")]
    pub intensity_compression: BinaryCompressionType,

    #[arg(long, value_parser=compression_parser, default_value="zlib compression")]
    pub ion_mobility_compression: BinaryCompressionType,
}

impl MZConvert {
    pub fn new(inpath: String, outpath: String) -> Self {
        Self {
            inpath,
            outpath,
            buffer_size: 8192,
            mz_compression: BinaryCompressionType::Zlib,
            intensity_compression: BinaryCompressionType::Zlib,
            ion_mobility_compression: BinaryCompressionType::Zlib,
        }
    }

    pub fn main(&self) -> io::Result<()> {
        let source = if self.inpath == "-" {
            Source::Stdin
        } else {
            Source::<_, _>::from(self.inpath.as_ref())
        };

        let sink = if self.outpath == "-" {
            Sink::Writer(Box::new(io::stdout()), MassSpectrometryFormat::MzML)
        } else {
            Sink::<CentroidPeak, DeconvolutedPeak>::from(self.outpath.as_ref())
        };

        self.open_reader(source, sink)
    }

    fn task<R: SpectrumSource + Send + 'static, W: SpectrumWriter + Send + 'static>(
        &self,
        reader: R,
        mut writer: W,
    ) -> io::Result<()> {
        let (send, recv) = sync_channel(self.buffer_size);
        let buffered= Arc::new(AtomicU64::default());
        let buffered_w = Arc::clone(&buffered);
        let reader_handle = thread::spawn(move || {
            reader.enumerate().for_each(|(i, s)| {
                let waiting_cnt = buffered.fetch_add(1, AtomicOrdering::SeqCst);
                if i % 5000 == 0 && i > 0 {
                    log::info!("Reading {} {} ({waiting_cnt} to write)", i, s.id());
                }
                if i % 100 == 0 && i > 0 {
                    log::debug!("Reading {} {} ({waiting_cnt} to write)", i, s.id());
                }

                send.send(s).unwrap()
            });
        });

        let writer_handle = thread::spawn(move || {
            for s in recv.iter() {
                let i = s.index();
                buffered_w.fetch_sub(1, AtomicOrdering::SeqCst);
                writer.write_owned(s).inspect_err(|e| {
                    log::error!("Failed to write spectrum {i}: {e}")
                }).unwrap();
            }
            writer.close().unwrap();
        });

        reader_handle.join().unwrap();
        writer_handle.join().unwrap();
        Ok(())
    }

    #[allow(unused)]
    fn configure_reader(&self, reader: &mut dyn Any) {
        #[cfg(feature = "thermo")]
        if let Some(reader) = reader.downcast_mut::<mzdata::io::thermo::ThermoRawReader>() {
            reader.set_load_extended_spectrum_data(false);
        }
        #[cfg(feature = "bruker_tdf")]
        if let Some(reader) = reader.downcast_mut::<mzdata::io::tdf::TDFSpectrumReader>() {
            reader.set_consolidate_peaks(false);
        }
    }

    fn configure_writer(&self, writer: &mut dyn Any) {
        info!("Configuring writer...");
        if let Some(writer) = writer.downcast_mut::<MzMLWriter<io::BufWriter<File>>>() {
            log::debug!("Configuring compression methods: {:?}", self.mz_compression);
            writer.set_compression_method(
                MZArray,
                BinaryDataArrayType::Float32,
                self.mz_compression,
            );
            writer.set_compression_method(
                MZArray,
                BinaryDataArrayType::Float64,
                self.mz_compression,
            );

            log::debug!(
                "Configuring compression methods: {:?}",
                self.intensity_compression
            );
            writer.set_compression_method(
                IntensityArray,
                BinaryDataArrayType::Float32,
                self.intensity_compression,
            );
            writer.set_compression_method(
                IntensityArray,
                BinaryDataArrayType::Float64,
                self.intensity_compression,
            );

            log::debug!(
                "Configuring compression methods: {:?}",
                self.ion_mobility_compression
            );

            let im_arrays = [
                ArrayType::MeanInverseReducedIonMobilityArray,
                ArrayType::RawDriftTimeArray,
                ArrayType::MeanDriftTimeArray,
                ArrayType::RawIonMobilityArray,
                ArrayType::MeanIonMobilityArray,
            ];

            for im_array in im_arrays {
                writer.set_compression_method(
                    im_array.clone(),
                    BinaryDataArrayType::Float64,
                    self.ion_mobility_compression,
                );
                writer.set_compression_method(
                    im_array,
                    BinaryDataArrayType::Float32,
                    self.ion_mobility_compression,
                );
            }

        }
    }
}

impl MassSpectrometryReadWriteProcess<CentroidPeak, DeconvolutedPeak> for MZConvert {
    type ErrorType = io::Error;

    fn task<
        R: RandomAccessSpectrumIterator<CentroidPeak, DeconvolutedPeak>
            + SpectrumSource<CentroidPeak, DeconvolutedPeak>
            + Send
            + Any
            + 'static,
        W: SpectrumWriter<CentroidPeak, DeconvolutedPeak> + Send + Any + 'static,
    >(
        &self,
        reader: R,
        writer: W,
    ) -> Result<(), Self::ErrorType> {
        self.task(reader, writer)
    }

    fn transform_reader<
        R: RandomAccessSpectrumIterator<CentroidPeak, DeconvolutedPeak>
            + MSDataFileMetadata
            + SpectrumSource<CentroidPeak, DeconvolutedPeak>
            + Send
            + Any
            + 'static,
    >(
        &self,
        mut reader: R,
        _format: MassSpectrometryFormat,
    ) -> Result<R, Self::ErrorType> {
        self.configure_reader(&mut reader);
        Ok(reader)
    }

    fn transform_writer<
        R: RandomAccessSpectrumIterator<CentroidPeak, DeconvolutedPeak>
            + MSDataFileMetadata
            + SpectrumSource<CentroidPeak, DeconvolutedPeak>
            + Send
            + Any
            + 'static,
        W: SpectrumWriter<CentroidPeak, DeconvolutedPeak> + MSDataFileMetadata + Send + Any + 'static,
    >(
        &self,
        reader: R,
        reader_format: MassSpectrometryFormat,
        mut writer: W,
        writer_format: MassSpectrometryFormat,
    ) -> Result<(R, W), Self::ErrorType> {
        self.configure_writer(&mut writer);
        if self.inpath != "-" {
            let pb: PathBuf = self.inpath.clone().into();
            let checksum = if pb.is_dir() {
                Default::default()
            } else {
                info!("Computing checksum for {}", pb.display());
                
                checksum_file(&pb)?
            };
            let has_already = reader
                .file_description()
                .source_files
                .iter()
                .flat_map(|f| {
                    f.get_param_by_name("SHA-1")
                        .map(|c| c.value.as_str() == checksum)
                })
                .all(|a| a);

            if !has_already {
                let mut sf = SourceFile {
                    location: pb
                        .parent()
                        .map(|p| format!("file:///{}", p.to_string_lossy()))
                        .unwrap_or("file:///".to_string()),
                    name: pb
                        .file_name()
                        .map(|p| p.to_string_lossy().to_string())
                        .unwrap_or("".to_string()),
                    ..Default::default()
                };
                let par = ControlledVocabulary::MS.param_val(1000569, "SHA-1", checksum);
                sf.add_param(par);
                sf.file_format = reader_format.as_param();

                if let Some(ref_sf) = reader.file_description().source_files.last() {
                    sf.id_format = ref_sf.id_format.clone()
                }
                writer.file_description_mut().source_files.push(sf);
            }
        };

        let mut sw = Software {
            version: format!(
                "v{}",
                option_env!("CARGO_PKG_VERSION").unwrap_or_else(|| "?")
            ),
            id: Software::find_unique_id("mzconvert", writer.softwares()),
            ..Default::default()
        };
        sw.add_param(custom_software_name("mzconvert"));

        let mut method = ProcessingMethod {
            software_reference: sw.id.clone(),
            ..Default::default()
        };
        writer.softwares_mut().push(sw);

        if let Some(conv) = writer_format.as_conversion() {
            method.add_param(conv.into())
        }

        if writer.data_processings().is_empty() {
            let mut dp = DataProcessing::default();
            method.order = 0;
            dp.push(method.clone());
            dp.id = "DP1".to_string();
            writer.data_processings_mut().push(dp)
        } else {
            for dp in writer.data_processings_mut() {
                let mut next_step = method.clone();
                next_step.order = dp.highest_order() + 1;
                dp.push(next_step);
            }
        }
        Ok((reader, writer))
    }
}

fn main() -> io::Result<()> {
    env_logger::init();

    let job = MZConvert::parse();
    let start = time::Instant::now();
    job.main()?;
    let end = time::Instant::now();
    let elapsed = end - start;
    eprintln!("Conversion finished: {:0.2} seconds", elapsed.as_secs_f64());
    Ok(())
}