mzdata 0.63.4

A library to read mass spectrometry data formats and a data model for mass spectra
import sys
import gzip
import json
import io
import itertools
import re

from enum import IntFlag
from typing import Tuple, Dict, Set, List

import fastobo
from fastobo.term import (
    TermFrame,
    IsAClause,
    NameClause,
    DefClause,
)

from fastobo.doc import OboDoc

from fastobo.id import PrefixedIdent

ACQUISITION_SW = PrefixedIdent("MS", "1001455")
ANALYSIS_SW = PrefixedIdent("MS", "1001456")
DP_SW = PrefixedIdent("MS", "1001457")

segment_pattern = re.compile(r"(_[a-zA-Z])")

class SoftwareType(IntFlag):
    NoType = 0
    Analysis = 0b00000001
    DataProcessing = 0b00000010
    Acquisition = 0b00000100


def collect_software_types(cv: OboDoc) -> Tuple[Set[PrefixedIdent], Dict[PrefixedIdent, TermFrame]]:
    term: TermFrame
    id_to_clause = {}
    software_ids = {
        PrefixedIdent("MS", "1000531")
    }
    for term in itertools.chain(cv, cv):
        id_to_clause[term.id] = term
        for clause in term:
            if isinstance(clause, IsAClause):
                if clause.term in software_ids:
                    software_ids.add(term.id)
    return software_ids, id_to_clause

def format_name(match: re.Match) -> str:
    return match.group(1)[-1].upper()

def make_entry_for(term: TermFrame):
    name = None
    flags = SoftwareType.NoType
    parents = []
    descr = ''
    for clause in term:
        if isinstance(clause, NameClause):
            name = str(clause.name)
        if isinstance(clause, IsAClause):
            parents.append(str(clause.term))
            if clause.term == DP_SW:
                flags |= SoftwareType.DataProcessing
            elif clause.term == ANALYSIS_SW:
                flags |= SoftwareType.Analysis
            elif clause.term == ACQUISITION_SW:
                flags |= SoftwareType.Acquisition
        if isinstance(clause, DefClause):
            descr = re.sub(
                r"(\[|\])",
                lambda m: "\\\\" + m.group(1),
                str(clause.definition).replace('"', "'"),
            )

    vname: str = name
    if "-" in vname:
        vname = vname.replace("-", "_")
    if ":" in vname:
        vname = vname.replace(":", "_")
    if '/' in vname:
        vname = vname.replace('/', '_')
    if "+" in vname:
        vname = vname.replace("+", "plus")
    if "!" in vname:
        vname = vname.replace("!", "_")

    vname: str = segment_pattern.sub(format_name, vname.replace(" ", "_").replace("software", "Software"))
    vname: str = vname[0].upper() + vname[1:]

    if vname[0].isdigit():
        vname = "_" + vname

    return f"""
    #[term(cv=MS, accession={term.id.local}, name="{name}", flags={{{int(flags)}}}, parents={{{json.dumps(parents)}}})]
    #[doc="{name} - {descr}"]
    {vname},"""


def generate_term_enum(terms: List[TermFrame]):
    buffer = io.StringIO()
    buffer.write("pub enum SoftwareTerm {")
    for term in terms:
        buffer.write(make_entry_for(term))
    buffer.write("\n}")
    return buffer.getvalue()


def main():
    cv: OboDoc = fastobo.load(gzip.open("./cv/psi-ms.obo.gz"))
    software_ids, id_to_clause = collect_software_types(cv)
    sw_terms = list(map(id_to_clause.get, sorted(software_ids)))
    text = generate_term_enum(sw_terms).encode('utf8')
    sys.stdout.buffer.write(text)


if __name__ == "__main__":
    main()