statumen 0.3.0

Statumen whole-slide image reader
Documentation
#!/usr/bin/env python3

from __future__ import annotations

import shutil
import subprocess
import tempfile
from pathlib import Path


ROOT = Path(__file__).resolve().parent
WIDTH = 16
HEIGHT = 12


def run(*args: str) -> None:
    subprocess.run(args, check=True, cwd=ROOT)


def write_ppm(path: Path, pixels: list[tuple[int, int, int]], width: int, height: int) -> None:
    with path.open("wb") as fh:
        fh.write(f"P6\n{width} {height}\n255\n".encode("ascii"))
        for r, g, b in pixels:
            fh.write(bytes((r, g, b)))


def rgb_source_pixels() -> list[tuple[int, int, int]]:
    pixels: list[tuple[int, int, int]] = []
    for y in range(HEIGHT):
        for x in range(WIDTH):
            pixels.append(
                (
                    (x * 17 + y * 11) % 256,
                    (x * 7 + y * 23 + 40) % 256,
                    (x * 29 + y * 5 + 80) % 256,
                )
            )
    return pixels


def write_ycbcr_raw(path: Path, sub_x: int, sub_y: int) -> None:
    y_plane = bytearray()
    cb_plane = bytearray()
    cr_plane = bytearray()

    for y in range(HEIGHT):
        for x in range(WIDTH):
            y_plane.append((x * 13 + y * 19 + 32) % 256)

    for y in range(0, HEIGHT, sub_y):
        for x in range(0, WIDTH, sub_x):
            cb_plane.append((64 + x * 9 + y * 7) % 256)
            cr_plane.append((192 - x * 5 - y * 11) % 256)

    path.write_bytes(bytes(y_plane + cb_plane + cr_plane))


def read_ppm_pixels(path: Path) -> tuple[int, int, bytes]:
    data = path.read_bytes()
    if not data.startswith(b"P6\n"):
        raise ValueError(f"{path} is not a binary PPM")

    cursor = 3
    tokens: list[bytes] = []
    while len(tokens) < 3:
        while cursor < len(data) and chr(data[cursor]).isspace():
            cursor += 1
        if cursor < len(data) and data[cursor] == ord("#"):
            while cursor < len(data) and data[cursor] != ord("\n"):
                cursor += 1
            continue
        start = cursor
        while cursor < len(data) and not chr(data[cursor]).isspace():
            cursor += 1
        tokens.append(data[start:cursor])

    width = int(tokens[0])
    height = int(tokens[1])
    maxval = int(tokens[2])
    if maxval != 255:
        raise ValueError(f"unsupported maxval in {path}: {maxval}")

    while cursor < len(data) and chr(data[cursor]).isspace():
        cursor += 1
    return width, height, data[cursor:]


def ycbcr_triplets_to_rgb(ycbcr_data: bytes) -> list[tuple[int, int, int]]:
    if len(ycbcr_data) % 3 != 0:
        raise ValueError("YCbCr data is not pixel-interleaved")

    pixels: list[tuple[int, int, int]] = []
    for offset in range(0, len(ycbcr_data), 3):
        yy = ycbcr_data[offset]
        cb = ycbcr_data[offset + 1]
        cr = ycbcr_data[offset + 2]
        cb_off = cb - 128
        cr_off = cr - 128
        r = max(0, min(255, yy + ((1402 * cr_off) // 1000)))
        g = max(0, min(255, yy - ((344 * cb_off + 714 * cr_off) // 1000)))
        b = max(0, min(255, yy + ((1772 * cb_off) // 1000)))
        pixels.append((r, g, b))
    return pixels


def build_fixture(
    tmp: Path,
    stem: str,
    compress_args: list[str],
    upsample: bool,
    convert_ycbcr_reference: bool,
) -> None:
    j2k = tmp / f"{stem}.j2k"
    decoded = tmp / f"{stem}.ppm"

    run("opj_compress", *compress_args, "-o", str(j2k))

    decompress_args = ["opj_decompress"]
    if upsample:
        decompress_args.append("-upsample")
    decompress_args.extend(["-i", str(j2k), "-o", str(decoded)])
    run(*decompress_args)
    if convert_ycbcr_reference:
        width, height, pixels = read_ppm_pixels(decoded)
        write_ppm(ROOT / decoded.name, ycbcr_triplets_to_rgb(pixels), width, height)
    else:
        shutil.copy2(decoded, ROOT / decoded.name)
    shutil.copy2(j2k, ROOT / j2k.name)


def main() -> None:
    if shutil.which("opj_compress") is None or shutil.which("opj_decompress") is None:
        raise SystemExit("opj_compress and opj_decompress must be available on PATH")

    with tempfile.TemporaryDirectory() as tmpdir:
        tmp = Path(tmpdir)

        rgb_source = tmp / "rgb_source.ppm"
        write_ppm(rgb_source, rgb_source_pixels(), WIDTH, HEIGHT)

        write_ycbcr_raw(tmp / "ycbcr_444.raw", 1, 1)
        write_ycbcr_raw(tmp / "ycbcr_422.raw", 2, 1)
        write_ycbcr_raw(tmp / "ycbcr_420.raw", 2, 2)

        build_fixture(
            tmp,
            "rgb_nomct",
            ["-i", str(rgb_source), "-I", "-mct", "0", "-r", "5", "-n", "3"],
            upsample=False,
            convert_ycbcr_reference=False,
        )
        build_fixture(
            tmp,
            "rgb_mct",
            ["-i", str(rgb_source), "-I", "-mct", "1", "-r", "5", "-n", "3"],
            upsample=False,
            convert_ycbcr_reference=False,
        )
        build_fixture(
            tmp,
            "ycbcr_444",
            [
                "-i",
                str(tmp / "ycbcr_444.raw"),
                "-F",
                f"{WIDTH},{HEIGHT},3,8,u@1x1:1x1:1x1",
                "-I",
                "-mct",
                "0",
                "-r",
                "5",
                "-n",
                "3",
            ],
            upsample=True,
            convert_ycbcr_reference=True,
        )
        build_fixture(
            tmp,
            "ycbcr_422",
            [
                "-i",
                str(tmp / "ycbcr_422.raw"),
                "-F",
                f"{WIDTH},{HEIGHT},3,8,u@1x1:2x1:2x1",
                "-I",
                "-mct",
                "0",
                "-r",
                "5",
                "-n",
                "3",
            ],
            upsample=True,
            convert_ycbcr_reference=False,
        )
        build_fixture(
            tmp,
            "ycbcr_420",
            [
                "-i",
                str(tmp / "ycbcr_420.raw"),
                "-F",
                f"{WIDTH},{HEIGHT},3,8,u@1x1:2x2:2x2",
                "-I",
                "-mct",
                "0",
                "-r",
                "5",
                "-n",
                "3",
            ],
            upsample=True,
            convert_ycbcr_reference=False,
        )


if __name__ == "__main__":
    main()