from __future__ import annotations
import argparse
import json
import pathlib
import shlex
import subprocess
import sys
import typing
from . import defs
from . import variant
from . import vbuild
if typing.TYPE_CHECKING:
from typing import Any, Callable, Final
SubPAction: Final = argparse._SubParsersAction[argparse.ArgumentParser]
CMD_LIST_BRIEF: Final = [
("pkgfile", "install"),
]
_PATH_APT_SOURCES = pathlib.Path("/etc/apt/sources.list.d")
_PATH_APT_KEYRINGS = pathlib.Path("/usr/share/keyrings")
_PATH_RPM_GPG = pathlib.Path("/etc/pki/rpm-gpg")
_PATH_YUM_REPOS = pathlib.Path("/etc/yum.repos.d")
_PATH_PROG_RPMKEYS = pathlib.Path("/usr/bin/rpmkeys")
def cmd_detect(cfg: defs.Config) -> None:
try:
print(variant.detect_variant(cfg=cfg).name)
except variant.VariantError as err:
print(str(err), file=sys.stderr)
sys.exit(1)
def copy_file(cfg: defs.Config, src: pathlib.Path, dstdir: pathlib.Path) -> None:
dst: Final = dstdir / src.name
mode: Final = "0644"
cfg.diag(f"{src} -> {dst} [{mode}]")
try:
subprocess.check_call(
[
"install",
"-o",
"root",
"-g",
"root",
"-m",
mode,
"--",
src,
dst,
],
shell=False,
)
except subprocess.CalledProcessError as err:
raise variant.VariantFileError(f"Could not copy {src} over to {dst}: {err}") from err
def repo_name_with_extension(cfg: defs.Config, path: pathlib.Path) -> str:
if len(path.suffixes) != 1:
raise variant.VariantFileError(
f"Unexpected repository file name without an extension: {path}",
)
return f"{path.stem}{cfg.repotype.extension}{path.suffix}"
def repo_add_deb(cfg: defs.Config, var: defs.Variant, vardir: pathlib.Path) -> None:
assert isinstance(var.repo, defs.DebRepo)
try:
subprocess.check_call(var.commands.package.install + var.repo.req_packages, shell=False)
except subprocess.CalledProcessError as err:
raise variant.VariantFileError(
f"Could not install the required packages {' '.join(var.repo.req_packages)}: {err}",
) from err
copy_file(
cfg,
vardir / repo_name_with_extension(cfg, pathlib.Path(var.repo.sources)),
_PATH_APT_SOURCES,
)
copy_file(
cfg,
vardir / pathlib.Path(var.repo.keyring).name,
_PATH_APT_KEYRINGS,
)
try:
subprocess.check_call(["apt-get", "update"], shell=False)
except subprocess.CalledProcessError as err:
raise variant.VariantFileError(f"Could not update the APT database: {err}") from err
def repo_add_yum(cfg: defs.Config, var: defs.Variant, vardir: pathlib.Path) -> None:
assert isinstance(var.repo, defs.YumRepo)
try:
subprocess.check_call(
[
"yum",
"--disablerepo=storpool-*'",
"install",
"-q",
"-y",
"ca-certificates",
],
shell=False,
)
except subprocess.CalledProcessError as err:
raise variant.VariantFileError(
f"Could not install the required ca-certificates package: {err}",
) from err
copy_file(
cfg,
vardir / repo_name_with_extension(cfg, pathlib.Path(var.repo.yumdef)),
_PATH_YUM_REPOS,
)
copy_file(
cfg,
vardir / pathlib.Path(var.repo.keyring).name,
_PATH_RPM_GPG,
)
if _PATH_PROG_RPMKEYS.is_file():
try:
subprocess.check_call(
[
"rpmkeys",
"--import",
_PATH_RPM_GPG / pathlib.Path(var.repo.keyring).name,
],
shell=False,
)
except subprocess.CalledProcessError as err:
raise variant.VariantFileError(f"Could not import the RPM PGP keys: {err}") from err
try:
subprocess.check_call(
[
"yum",
"--disablerepo=*",
f"--enablerepo=storpool-{cfg.repotype.name}",
"clean",
"metadata",
],
shell=False,
)
except subprocess.CalledProcessError as err:
raise variant.VariantFileError(
f"Could not clean the Yum repository metadata: {err}",
) from err
def repo_add(cfg: defs.Config) -> None:
assert cfg.repodir is not None var: Final = variant.detect_variant(cfg)
vardir: Final = cfg.repodir / var.name
if not vardir.is_dir():
raise defs.VariantConfigError(f"No {vardir} directory")
if isinstance(var.repo, defs.DebRepo):
repo_add_deb(cfg, var, vardir)
elif isinstance(var.repo, defs.YumRepo):
repo_add_yum(cfg, var, vardir)
def cmd_repo_add(cfg: defs.Config) -> None:
try:
repo_add(cfg)
except variant.VariantError as err:
print(str(err), file=sys.stderr)
sys.exit(1)
def command_find(cfg: defs.Config, var: defs.Variant) -> list[str]:
assert cfg.command is not None
current = var.commands
for comp in cfg.command.split("."):
if not isinstance(current, tuple):
raise defs.VariantConfigError("Too many command components")
fields: tuple[str, ...] = current._fields
if comp not in fields:
raise defs.VariantConfigError(
f"Invalid command component '{comp}', should be one of {' '.join(fields)}",
)
current = getattr(current, comp)
if not isinstance(current, list):
fields = current._fields
raise defs.VariantConfigError(
f"Incomplete command specification, should continue with one of {' '.join(fields)}",
)
return current
def command_run(cfg: defs.Config) -> None:
assert cfg.args is not None
cmd: Final = command_find(cfg, variant.detect_variant(cfg=cfg)) + cfg.args
cmdstr: Final = shlex.join(cmd)
cfg.diag(f"About to run `{cmdstr}`")
if cfg.noop:
print(cmdstr)
return
try:
subprocess.check_call(cmd, shell=False)
except subprocess.CalledProcessError as err:
raise variant.VariantFileError(f"Could not run `{cmdstr}`: {err}") from err
def cmd_command_list(cfg: defs.Config) -> None:
var: Final = variant.detect_variant(cfg=cfg)
for cat_name, category in (
(name, getattr(var.commands, name)) for name in sorted(var.commands._fields)
):
for cmd_name, command in (
(name, getattr(category, name)) for name in sorted(category._fields)
):
result = ["..."] if (cat_name, cmd_name) in CMD_LIST_BRIEF else command
print(f"{cat_name}.{cmd_name}: {shlex.join(result)}")
def cmd_command_run(cfg: defs.Config) -> None:
try:
command_run(cfg)
except variant.VariantError as err:
print(str(err), file=sys.stderr)
sys.exit(1)
def cmd_features(_cfg: defs.Config) -> None:
print(
f"Features: repo=0.2 variant={defs.VERSION} "
f"format={defs.FORMAT_VERSION[0]}.{defs.FORMAT_VERSION[1]}",
)
def cmd_show(cfg: defs.Config) -> None:
vbuild.build_variants(cfg)
def get_data() -> Any:
if cfg.command == "all":
return defs.jsonify(
{
"format": {
"version": {
"major": defs.FORMAT_VERSION[0],
"minor": defs.FORMAT_VERSION[1],
},
},
"version": defs.VERSION,
"variants": vbuild.VARIANTS,
"order": [var.name for var in vbuild.DETECT_ORDER],
},
)
assert cfg.command is not None var: Final[defs.Variant | None] = (
variant.detect_variant(cfg)
if cfg.command == "current"
else vbuild.VARIANTS.get(cfg.command)
)
if var is None:
sys.exit(f"Invalid build variant '{cfg.command}'")
return defs.jsonify(
{
"format": {
"version": {
"major": defs.FORMAT_VERSION[0],
"minor": defs.FORMAT_VERSION[1],
},
},
"version": defs.VERSION,
"variant": var,
},
)
print(json.dumps(get_data(), sort_keys=True, indent=2))
def parse_arguments() -> tuple[defs.Config, Callable[[defs.Config], None]]:
parser: Final = argparse.ArgumentParser(prog="storpool_variant")
parser.add_argument(
"-v",
"--verbose",
action="store_true",
default=False,
help="verbose operation; display diagnostic output",
)
subp = parser.add_subparsers()
p_cmd = subp.add_parser("command", help="Distribition-specific commands")
subp_cmd = p_cmd.add_subparsers()
p_subcmd = subp_cmd.add_parser("list", help="List the distribution-specific commands")
p_subcmd.set_defaults(func=cmd_command_list)
p_subcmd = subp_cmd.add_parser("run", help="Run a distribution-specific command")
p_subcmd.add_argument(
"-N",
"--noop",
action="store_true",
help="display the command instead of executing it",
)
p_subcmd.add_argument("command", type=str, help="The identifier of the command to run")
p_subcmd.add_argument("args", type=str, nargs="*", help="Arguments to pass to the command")
p_subcmd.set_defaults(func=cmd_command_run)
p_cmd = subp.add_parser("detect", help="Detect the build variant for the current host")
p_cmd.set_defaults(func=cmd_detect)
p_cmd = subp.add_parser("features", help="Display the features supported by storpool_variant")
p_cmd.set_defaults(func=cmd_features)
p_cmd = subp.add_parser("repo", help="StorPool repository-related commands")
subp_cmd = p_cmd.add_subparsers()
p_subcmd = subp_cmd.add_parser("add", help="Install the StorPool repository configuration")
p_subcmd.add_argument(
"-d",
"--repodir",
type=pathlib.Path,
required=True,
help="The path to the directory with the repository configuration",
)
p_subcmd.add_argument(
"-t",
"--repotype",
type=str,
default=defs.REPO_TYPES[0].name,
choices=[item.name for item in defs.REPO_TYPES],
help="The type of repository to add (default: contrib)",
)
p_subcmd.set_defaults(func=cmd_repo_add)
p_cmd = subp.add_parser("show", help="Display information about a build variant")
p_cmd.add_argument(
"name",
type=str,
help=(
"the name of the build variant to query, 'all' for all, or "
"'current' for the one detected"
),
)
p_cmd.set_defaults(func=cmd_show)
args: Final = parser.parse_args()
if getattr(args, "func", None) is None:
sys.exit("No command specified")
return (
defs.Config(
args=getattr(args, "args", None),
command=getattr(args, "command", getattr(args, "name", None)),
noop=bool(getattr(args, "noop", False)),
repodir=getattr(args, "repodir", None),
repotype=next(rtype for rtype in defs.REPO_TYPES if rtype.name == args.repotype)
if hasattr(args, "repotype")
else defs.REPO_TYPES[0],
verbose=args.verbose,
),
args.func,
)
def main() -> None:
cfg, func = parse_arguments()
func(cfg)
if __name__ == "__main__":
main()