from __future__ import annotations
import base64
import struct
import sys
import xml.etree.ElementTree as ET
from pathlib import Path
NS = "http://psi.hupo.org/ms/mzml"
MZNS = f"{{{NS}}}"
def _tag(local: str) -> str:
return f"{MZNS}{local}"
def check(condition: bool, msg: str, errors: list[str]) -> None:
if not condition:
errors.append(msg)
def validate_file(path: str) -> list[str]:
errors: list[str] = []
try:
tree = ET.parse(path)
except ET.ParseError as exc:
return [f"XML parse error: {exc}"]
root = tree.getroot()
local = root.tag.split("}")[-1] if "}" in root.tag else root.tag
check(
local in ("mzML", "indexedmzML"),
f"root element is <{local}>, expected <mzML> or <indexedmzML>",
errors,
)
check(
NS in root.tag,
f"root element namespace missing (got '{root.tag}')",
errors,
)
mzml = root.find(_tag("mzML")) if local == "indexedmzML" else root
if mzml is None:
errors.append("no <mzML> element found")
return errors
for req in ("fileDescription", "softwareList", "instrumentConfigurationList",
"dataProcessingList", "run"):
check(
mzml.find(_tag(req)) is not None,
f"required element <{req}> is absent",
errors,
)
run = mzml.find(_tag("run"))
if run is None:
return errors
spectrum_list = run.find(_tag("spectrumList"))
chromatogram_list = run.find(_tag("chromatogramList"))
check(
spectrum_list is not None or chromatogram_list is not None,
"<run> must contain <spectrumList> or <chromatogramList>",
errors,
)
if spectrum_list is None:
return errors
declared = spectrum_list.get("count", "")
spectra = spectrum_list.findall(_tag("spectrum"))
try:
check(
int(declared) == len(spectra),
f"spectrumList count={declared} but found {len(spectra)} <spectrum> elements",
errors,
)
except ValueError:
errors.append(f"spectrumList count attribute is not an integer: {declared!r}")
for spec in spectra:
idx = spec.get("index", "<missing>")
sid = spec.get("id", "<missing>")
prefix = f"spectrum index={idx} id={sid!r}:"
check(spec.get("index") is not None, f"{prefix} missing 'index' attribute", errors)
check(spec.get("id") is not None, f"{prefix} missing 'id' attribute", errors)
check(spec.get("defaultArrayLength") is not None,
f"{prefix} missing 'defaultArrayLength' attribute", errors)
dal_str = spec.get("defaultArrayLength", "0")
try:
dal = int(dal_str)
except ValueError:
errors.append(f"{prefix} defaultArrayLength is not an integer: {dal_str!r}")
dal = 0
cv_accessions = {
cv.get("accession", "")
for cv in spec.findall(_tag("cvParam"))
}
has_ms_level = any(
cv.get("accession", "") == "MS:1000511"
for cv in spec.findall(_tag("cvParam"))
)
check(has_ms_level, f"{prefix} missing MS:1000511 (ms level) cvParam", errors)
has_spectrum_type = bool(
cv_accessions & {"MS:1000579", "MS:1000580", "MS:1000581",
"MS:1000526", "MS:1000527", "MS:1000528"}
)
scan_cv = {
cv.get("accession", "")
for scan in spec.findall(f".//{_tag('scan')}")
for cv in scan.findall(_tag("cvParam"))
}
has_spectrum_mode = bool(
(cv_accessions | scan_cv) & {"MS:1000127", "MS:1000128"}
)
check(
has_spectrum_type or has_ms_level,
f"{prefix} no spectrum type CV param (MS1 / MSn / SRM ...)",
errors,
)
array_list = spec.find(_tag("binaryDataArrayList"))
if array_list is not None:
arrays = array_list.findall(_tag("binaryDataArray"))
for arr in arrays:
arr_accs = {cv.get("accession", "") for cv in arr.findall(_tag("cvParam"))}
bin_elem = arr.find(_tag("binary"))
is_32 = "MS:1000521" in arr_accs is_64 = "MS:1000514" in arr_accs check(
is_32 or is_64,
f"{prefix} binaryDataArray missing float-precision CV param",
errors,
)
is_b64 = "MS:1000514" not in arr_accs or True no_compress = "MS:1000576" in arr_accs zlib_compress = "MS:1000574" in arr_accs
check(
no_compress or zlib_compress,
f"{prefix} binaryDataArray missing compression CV param",
errors,
)
if bin_elem is not None and bin_elem.text:
try:
raw_bytes = base64.b64decode(bin_elem.text.strip())
if zlib_compress:
import zlib
raw_bytes = zlib.decompress(raw_bytes)
item_size = 4 if is_32 else 8
if item_size > 0:
n_items = len(raw_bytes) // item_size
check(
n_items == dal,
f"{prefix} binary array decoded to {n_items} values, "
f"expected {dal} (defaultArrayLength)",
errors,
)
except Exception as exc:
errors.append(f"{prefix} binary data decode error: {exc}")
elif dal > 0 and (bin_elem is None or not bin_elem.text):
errors.append(
f"{prefix} defaultArrayLength={dal} but binary element is empty"
)
return errors
def main() -> int:
if len(sys.argv) < 2:
print("Usage: validate_mzml.py <file.mzML> [file2.mzML ...]", file=sys.stderr)
return 1
overall_ok = True
for path in sys.argv[1:]:
errs = validate_file(path)
if errs:
overall_ok = False
print(f"FAIL {path}")
for e in errs:
print(f" - {e}")
else:
p = Path(path)
n_spectra = "(unknown)"
try:
tree = ET.parse(path)
root = tree.getroot()
local = root.tag.split("}")[-1] if "}" in root.tag else root.tag
mzml = root.find(f"{{{NS}}}mzML") if local == "indexedmzML" else root
if mzml is not None:
run = mzml.find(f"{{{NS}}}run")
if run is not None:
sl = run.find(f"{{{NS}}}spectrumList")
if sl is not None:
n_spectra = sl.get("count", n_spectra)
except Exception:
pass
print(f"OK {path} ({n_spectra} spectra)")
return 0 if overall_ok else 1
if __name__ == "__main__":
sys.exit(main())