from __future__ import annotations
import errno
import pathlib
import shlex
import subprocess
import typing
from . import defs
from . import vbuild
from . import yaiparser
from .defs import (
VERSION,
Config,
Variant,
VariantError,
)
from .vbuild import update_namedtuple
if typing.TYPE_CHECKING:
from typing import Final, Iterable
class VariantKeyError(VariantError):
class VariantFileError(VariantError):
class VariantRemoteError(VariantError):
hostname: str
msg: str
def __init__(self, hostname: str, msg: str) -> None:
super().__init__()
self.hostname = hostname
self.msg = msg
def __str__(self) -> str:
return f"{self.hostname}: {self.msg}"
class VariantDetectError(VariantError):
_DEFAULT_CONFIG = Config()
SAFEENC = "Latin-1"
def _detect_from_os_release(cfg: Config) -> Variant | None:
try:
data: Final = yaiparser.YAIParser("/etc/os-release").parse()
except OSError as err:
if err.errno != errno.ENOENT:
raise
os_id, os_version = None, None
else:
os_id, os_version = data.get("ID"), data.get("VERSION_ID")
if os_id is not None and os_version is not None:
cfg.diag(f"Matching os-release id {os_id!r} version {os_version!r}")
for var in vbuild.DETECT_ORDER:
cfg.diag(f"- trying {var.name}")
if var.detect.os_id == os_id and var.detect.os_version_regex.match(os_version):
cfg.diag(" - found it!")
return var
return None
def _detect_from_files(cfg: Config) -> Variant | None:
cfg.diag("Trying non-os-release-based heuristics")
for var in vbuild.DETECT_ORDER:
cfg.diag(f"- trying {var.name}")
try:
cfg.diag(f" - {var.detect.filename}")
for line in pathlib.Path(var.detect.filename).read_text(encoding=SAFEENC).splitlines():
if var.detect.regex.match(line):
cfg.diag(f" - found it: {line}")
return var
except OSError as err:
if err.errno != errno.ENOENT:
raise VariantDetectError(
f"Could not read the {var.detect.filename} file: {err}",
) from err
cfg.diag(f" - no {var.detect.filename}")
return None
def detect_variant(cfg: Config = _DEFAULT_CONFIG) -> Variant:
vbuild.build_variants(cfg)
cfg.diag("Trying to detect the current hosts's build variant")
if (var := _detect_from_os_release(cfg)) is not None:
return var
if (var := _detect_from_files(cfg)) is not None:
return var
raise VariantDetectError("Could not detect the current host's build variant")
def get_all_variants(cfg: Config = _DEFAULT_CONFIG) -> dict[str, Variant]:
vbuild.build_variants(cfg)
return dict(vbuild.VARIANTS)
def get_all_variants_in_order(cfg: Config = _DEFAULT_CONFIG) -> list[Variant]:
vbuild.build_variants(cfg)
return list(vbuild.DETECT_ORDER)
def get_by_alias(alias: str, cfg: Config = _DEFAULT_CONFIG) -> Variant:
vbuild.build_variants(cfg)
for var in vbuild.VARIANTS.values():
if var.builder.alias == alias:
return var
raise VariantKeyError(f"No variant with alias {alias}")
def get_variant(name: str, cfg: Config = _DEFAULT_CONFIG) -> Variant:
vbuild.build_variants(cfg)
try:
return vbuild.VARIANTS[name]
except KeyError as err:
raise VariantKeyError(f"No variant named {name}") from err
def list_all_packages(var: Variant, patterns: Iterable[str] | None = None) -> list[defs.OSPackage]:
cmd: Final = list(var.commands.package.list_all)
if patterns is not None:
cmd.extend(patterns)
res: Final = []
for line in subprocess.check_output(cmd, shell=False).decode("UTF-8").splitlines():
fields = line.split("\t")
if len(fields) != 4:
raise VariantFileError(f"Unexpected line in the '{shlex.join(cmd)}' output: {line!r}")
if not fields[3].startswith("ii"):
continue
res.append(
defs.OSPackage(
name=fields[0],
version=fields[1],
arch=fields[2],
status="installed",
),
)
return res
__all__ = (
"VERSION",
"Config",
"Variant",
"VariantError",
"detect_variant",
"get_all_variants",
"get_all_variants_in_order",
"get_by_alias",
"get_variant",
"list_all_packages",
"update_namedtuple",
)