subplot 0.9.0

tools for specifying, documenting, and implementing automated acceptance tests for systems and software
Documentation
#!/usr/bin/env python3

import argparse
import glob
import json
import os
import sys
import time

from subprocess import PIPE, DEVNULL, STDOUT


class Runcmd:
    """Run external commands in various ways"""

    def __init__(self, verbose, progress, offline):
        self._verbose = verbose
        self._progress = progress
        self.offline = offline
        # Deliberately chosen because it's 12:45 / 13:45 offset from UTC
        # As such it ought to show any TZ related errors if we're lucky.
        self._env = {"TZ": "NZ-CHAT"}

    def _write_msg(self, msg):
        sys.stdout.write(f"{msg}\n")
        sys.stdout.flush()

    def setenv(self, key, value):
        self._env[key] = value

    def msg(self, msg):
        if self._verbose:
            self._write_msg(msg)

    def title(self, title):
        if self._verbose:
            self._write_msg("")
            self._write_msg("=" * len(title))
            self._write_msg(title)
            self._write_msg("=" * len(title))
        elif self._progress:
            self._write_msg(title)

    def runcmd_unchecked(self, argv, **kwargs):
        """Run a command (generic version)

        Return a subcommand.CompletedProcess. It's the caller's duty
        check that the command succeeded.

        All actual execution of other programs happens via this method.
        However, only methods of this class should ever call this
        method.
        """

        # Import "run" here so that no other part of the code can see the
        # symbol.
        from subprocess import run

        self.msg(f"RUN: {argv} {kwargs}")

        if not self._verbose:
            if "stdout" not in kwargs:
                kwargs["stdout"] = PIPE
            if "stderr" not in kwargs:
                kwargs["stderr"] = STDOUT

        assert "key" not in kwargs
        env = dict(os.environ)
        env.update(self._env)

        return run(argv, env=env, **kwargs)

    def runcmd(self, argv, **kwargs):
        """Run a command (generic version)

        If the command fails, terminate the program. On success, return
        a subprocess.CompletedProcess.
        """

        p = self.runcmd_unchecked(argv, **kwargs)
        if p.returncode != 0:
            sys.stdout.write((p.stdout or b"").decode("UTF-8"))
            sys.stderr.write((p.stderr or b"").decode("UTF-8"))
            sys.stderr.write(f"Command {argv} failed\n")
            sys.exit(p.returncode)
        return p

    def runcmd_maybe(self, argv, **kwargs):
        """Run a command it's availbe, or quietly do no nothing"""
        if self.got_command(argv[0]):
            self.runcmd(argv, **kwargs)

    def got_command(self, name):
        """Is a command of a given name available?"""
        p = self.runcmd_unchecked(["which", name], stdout=DEVNULL)
        return p.returncode == 0

    def cargo(self, args, **kwargs):
        """Run cargo with arguments."""
        return self.runcmd(["cargo"] + args, **kwargs)

    def cargo_maybe(self, args, **kwargs):
        """Run cargo if the desired subcommand is available"""
        if self.got_cargo(args[0]):
            return self.runcmd(["cargo"] + args, **kwargs)

    def got_cargo(self, subcommand):
        """Is a cargo subcommand available?"""
        p = self.runcmd(["cargo", "--list"], check=True, stdout=PIPE)
        lines = [
            line.split()[0]
            for line in p.stdout.decode("UTF-8").splitlines()
            if line.strip()
        ]
        return subcommand in lines

    def codegen(self, md, template, output, **kwargs):
        """Run the Subplot code generator and the test program it produces"""
        self.cargo(
            [
                "run",
                "--package=subplot",
                "--bin=subplot",
                "--",
                f"--resources={os.path.abspath('share')}",
                "codegen",
                f"--template={template}",
                md,
                f"--output={output}",
            ],
            **kwargs,
        )

    def docgen(self, md, template, output, **kwargs):
        """Run the Subplot document generator"""

        # GitLab CI, and other CI engines, runs tests under Docker, which uses
        # an overlay file system, which at least sometimes only has full-second
        # time stamps in file meta data, and sometimes that seems to happen
        # when the kernel needs to flush its inode cache, and so it's not very
        # deterministic. This leads to occasional test failures. To prevent
        # such flaky tests, we wait for a second to make sure the time stamp of
        # the output will be newer than the input has.
        #
        # This is an ugly kluge. It would be possible to check if the sleep is
        # needed, but that kind of code is going to be tricky. Best avoid
        # tricky code in test suites. Test code should be obviously correct.
        time.sleep(1)

        self.cargo(
            [
                "run",
                "--package=subplot",
                "--bin=subplot",
                "--",
                f"--resources={os.path.abspath('share')}",
                "docgen",
                f"--template={template}",
                md,
                f"--output={output}",
            ],
            **kwargs,
        )

    def get_templates(self, filename):
        metadata = self.cargo(
            [
                "run",
                "--quiet",
                "--package=subplot",
                "--bin=subplot",
                "--",
                f"--resources={os.path.abspath('share')}",
                "metadata",
                "-o",
                "json",
                "--merciful",
                filename,
            ],
            stdout=PIPE,
            stderr=PIPE,
        ).stdout.decode("UTF-8")
        metadata = json.loads(metadata)
        impls = metadata.get("impls", {})
        if not impls:
            sys.exit(f"{filename} does not specify a template")
        impl_names = [name for name in impls.keys()]
        return impl_names


def find_files(pattern, pred):
    """Find files recursively, if they are accepted by a predicate function"""
    return [f for f in glob.glob(pattern, recursive=True) if pred(f)]


def check_python(r):
    """Run all checks for Python code"""
    r.title("checking Python code")

    # Find all Python files anywhere, except those we know aren't proper Python.
    py = find_files(
        "**/*.py",
        lambda f: os.path.basename(f) not in ("template.py", "test.py")
        and "test-outputs" not in f,
    )

    # Test with flake8 if available. Flake8 finds Python files itself.
    r.runcmd_maybe(["flake8", "--config=flake8.ini", "check"] + py)

    # Check formatting with Black. We need to provide the files to Python
    # ourselves.
    # r.runcmd_maybe(["black", "--check"] + py)

    # Find and run unit tests.
    tests = find_files("**/*_tests.py", lambda f: True)
    for test in tests:
        dirname = os.path.dirname(test)
        test = os.path.basename(test)
        r.runcmd(["python3", test], cwd=dirname)


def check_shell(r):
    """Run all checks for shell code"""
    r.title("checking shell code")

    # Find all shell files anywhere, except generated test programs.
    sh = find_files(
        "**/*.sh",
        lambda f: os.path.basename(f) != "test.sh" and "test-outputs" not in f,
    )

    r.runcmd_maybe(["shellcheck"] + sh)


def check_rust(r, strict=False, sloppy=False):
    """Run all checks for Rust code"""
    r.title("checking Rust code")

    r.runcmd(["cargo", "build", "--workspace", "--all-targets"])

    if r.got_cargo("clippy") and not sloppy:
        argv = [
            "cargo",
            "clippy",
            "--workspace",
            "--all-targets",
        ]
        if strict:
            argv += [
                "--",
                "-Dwarnings",
            ]
        r.runcmd(argv)
    elif strict:
        sys.exit("Strict Rust checks specified, but clippy was not found")

    r.runcmd(["cargo", "test", "--workspace"])
    if not sloppy:
        r.runcmd(["cargo", "fmt", "--", "--check"])


def check_subplots(r):
    """Run all Subplots and generate documents for them"""
    output = os.path.abspath("test-outputs")
    os.makedirs(output, exist_ok=True)

    subplots = find_files(
        "**/*.subplot",
        lambda f: f == f.lower() and "subplotlib" not in f and "test-outputs" not in f,
    )
    if r.offline:
        subplots = [s for s in subplots if s != "reference.subplot"]
    for subplot0 in subplots:
        r.title(f"checking subplot {subplot0}")

        dirname = os.path.dirname(subplot0) or "."
        subplot = os.path.basename(subplot0)
        base, _ = os.path.splitext(subplot)

        doc_template = None

        for template in r.get_templates(subplot0):
            if doc_template is None:
                doc_template = template
            if template == "python":
                test_py = os.path.join(output, f"test-{base}.py")
                test_log = os.path.join(output, f"test-{base}.log")

                # Remove test log from previous run, if any.
                if os.path.exists(test_log):
                    os.remove(test_log)

                bindir = get_bin_dir(r)

                r.codegen(subplot, "python", test_py, cwd=dirname)
                p = r.runcmd_unchecked(
                    [
                        "python3",
                        test_py,
                        "--log",
                        test_log,
                        f"--env=SUBPLOT_DIR={bindir}",
                    ],
                    cwd=dirname,
                )
                if p.returncode != 0:
                    if os.path.exists(test_log):
                        tail(test_log)
                    sys.exit(1)
            elif template == "bash":
                test_sh = os.path.join(output, f"test-{base}.sh")
                r.codegen(subplot, "bash", test_sh, cwd=dirname)
                r.runcmd(["bash", "-x", test_sh], cwd=dirname)
            elif template == "rust":
                r.msg(f"Ignoring Rust template in {subplot0}")
            else:
                sys.exit(f"unknown template {template} in {subplot0}")

        base = os.path.basename(subplot)
        base, _ = os.path.splitext(subplot)
        base = os.path.join(output, base)
        r.docgen(subplot, doc_template, base + ".html", cwd=dirname)


def tail(filename, numlines=100):
    lines = []
    with open(filename) as f:
        for line in f.readlines():
            lines.append(line)
            lines = lines[-numlines:]

    print(f"last {len(lines)} of {filename}:")
    for line in lines:
        print(f"    {line.rstrip()}")


def check_tooling(r):
    """Check build environment for tooling the test suite needs"""
    commands = [
        "bash",
        "cargo",
        "dot",
        "plantuml",
        "rustc",
        "rustfmt",
    ]
    for command in commands:
        if not r.got_command(command):
            sys.exit(f"can't find {command}, which is needed for test suite")

    if not r.got_command("daemonize") and not r.got_command("/usr/sbin/daemonize"):
        sys.exit(
            "can't find daemonize in PATH or in /usr/sbin, but it's needed for test suite"
        )


def parse_args():
    """Parse command line arguments to this script"""
    p = argparse.ArgumentParser()
    p.add_argument("-v", dest="verbose", action="store_true", help="be verbose")
    p.add_argument(
        "-p", dest="progress", action="store_true", help="print some progress output"
    )
    p.add_argument(
        "--strict", action="store_true", help="don't allow compiler warnings"
    )
    p.add_argument(
        "--sloppy", action="store_true", help="don't check formatting or with clippy"
    )
    p.add_argument(
        "--offline", action="store_true", help="only run tests that can be run offline"
    )

    all_whats = ["tooling", "python", "shell", "rust", "subplots"]
    p.add_argument(
        "what", nargs="*", default=all_whats, help=f"what to test: {all_whats}"
    )
    return p.parse_args()


def get_bin_dir(r):
    p = r.runcmd(
        ["cargo", "metadata", "--format-version=1", "--frozen", "--no-deps"],
        check=True,
        stdout=PIPE,
    )
    obj = json.loads(p.stdout)
    return os.path.join(obj["target_directory"], "debug")


def main():
    """Main program"""
    args = parse_args()

    r = Runcmd(args.verbose, args.progress, args.offline)
    r.setenv("PYTHONDONTWRITEBYTECODE", "1")

    for what in args.what:
        if what == "python":
            check_python(r)
        elif what == "shell":
            check_shell(r)
        elif what == "rust":
            check_rust(r, strict=args.strict, sloppy=args.sloppy)
        elif what == "subplots":
            check_subplots(r)
        elif what == "tooling":
            check_tooling(r)
        else:
            sys.exit(f"Unknown test {what}")

    sys.stdout.write("Everything seems to be in order.\n")


main()