import hashlib
import os
import platform
import shutil
import subprocess
import sys
import tarfile
import zipfile
from pathlib import Path
from urllib.error import HTTPError, URLError
from urllib.request import urlretrieve
REPO = "kreuzberg-dev/alef"
_PLATFORM_MAP: dict[tuple[str, str], tuple[str, str]] = {
("linux", "x86_64"): ("alef-x86_64-unknown-linux-gnu.tar.gz", "tar.gz"),
("linux", "aarch64"): ("alef-aarch64-unknown-linux-gnu.tar.gz", "tar.gz"),
("darwin", "arm64"): ("alef-aarch64-apple-darwin.tar.gz", "tar.gz"),
("windows", "amd64"): ("alef-x86_64-pc-windows-gnu.zip", "zip"),
}
def _detect_platform() -> tuple[str, str]:
system = platform.system().lower()
machine = platform.machine().lower()
if system == "darwin" and machine == "x86_64":
msg = "macOS x86_64 is not supported by alef pre-built binaries"
raise SystemExit(msg)
return system, machine
def _asset_name_for(system: str, machine: str) -> tuple[str, str]:
key = (system, machine)
if key not in _PLATFORM_MAP:
msg = f"Unsupported platform: {system}/{machine}"
raise SystemExit(msg)
return _PLATFORM_MAP[key]
def _hooks_dir() -> Path:
return Path(__file__).parent
def _parse_quoted(value: str) -> str:
return value.strip().strip('"').strip("'")
def _read_cargo_version(cargo_toml: Path) -> str | None:
in_workspace_package = False
in_package = False
for line in cargo_toml.read_text().splitlines():
stripped = line.strip()
if stripped.startswith("["):
in_workspace_package = stripped == "[workspace.package]"
in_package = stripped == "[package]"
continue
if (in_workspace_package or in_package) and stripped.startswith("version"):
_, _, val = stripped.partition("=")
return _parse_quoted(val)
return None
def _find_consumer_alef_toml() -> Path:
here = Path.cwd().resolve()
for candidate in (here, *here.parents):
alef_toml = candidate / "alef.toml"
if alef_toml.is_file():
return alef_toml
msg = f"Could not find alef.toml searching upward from {here}"
raise SystemExit(msg)
def _parse_alef_toml(alef_toml: Path) -> tuple[str | None, str | None, str | None]:
workspace_alef_version: str | None = None
version_from: str | None = None
inline_version: str | None = None
current_section: str | None = None
for raw in alef_toml.read_text().splitlines():
stripped = raw.strip()
if stripped.startswith("["):
current_section = stripped.strip("[]")
continue
if current_section == "workspace" and stripped.startswith("alef_version"):
_, _, val = stripped.partition("=")
workspace_alef_version = _parse_quoted(val)
elif current_section == "crate" and stripped.startswith("version_from"):
_, _, val = stripped.partition("=")
version_from = _parse_quoted(val)
elif current_section is None and stripped.startswith("version") and inline_version is None:
_, _, val = stripped.partition("=")
inline_version = _parse_quoted(val)
return workspace_alef_version, version_from, inline_version
def _resolve_version_from(alef_toml: Path, version_from: str) -> str | None:
cargo_toml = alef_toml.parent / version_from
if cargo_toml.is_file():
return _read_cargo_version(cargo_toml)
return None
def _version() -> str:
alef_toml = _find_consumer_alef_toml()
workspace_alef_version, version_from, inline_version = _parse_alef_toml(alef_toml)
if workspace_alef_version:
return workspace_alef_version
if version_from:
resolved = _resolve_version_from(alef_toml, version_from)
if resolved:
return resolved
if inline_version:
return inline_version
msg = "Could not resolve alef version: no [workspace].alef_version, no [crate].version_from, no top-level version"
raise SystemExit(msg)
def _expected_checksum(asset_name: str) -> str | None:
checksums_file = _hooks_dir() / "checksums.txt"
if not checksums_file.exists():
return None
for raw in checksums_file.read_text().splitlines():
stripped = raw.strip()
if not stripped or stripped.startswith("#"):
continue
parts = stripped.split()
if len(parts) == 2 and parts[1] == asset_name: return parts[0]
return None
def _sha256(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
def _cache_dir(version: str) -> Path:
base = Path(os.environ.get("XDG_CACHE_HOME", Path.home() / ".cache"))
return base / "alef-hooks" / version
def _binary_name() -> str:
return "alef.exe" if platform.system().lower() == "windows" else "alef"
def _download_and_extract(version: str, asset_name: str, fmt: str, cache: Path) -> None:
url = f"https://github.com/{REPO}/releases/download/v{version}/{asset_name}"
archive = cache / asset_name
cache.mkdir(parents=True, exist_ok=True)
print(f"[alef-hook] Downloading {url}", file=sys.stderr)
try:
urlretrieve(url, archive)
except HTTPError as exc:
raise SystemExit(
f"Failed to download {asset_name} (HTTP {exc.code})\n {url}\nEnsure v{version} release exists with assets."
) from None
except URLError as exc:
msg = f"Network error downloading {asset_name}: {exc.reason}"
raise SystemExit(msg) from None
expected = _expected_checksum(asset_name)
if expected is not None:
actual = _sha256(archive)
if actual != expected:
archive.unlink(missing_ok=True)
msg = f"Checksum mismatch for {asset_name}: expected {expected}, got {actual}"
raise SystemExit(msg)
if fmt == "tar.gz":
with tarfile.open(archive, "r:gz") as tf:
tf.extractall(cache, filter="data")
else:
with zipfile.ZipFile(archive, "r") as zf:
zf.extractall(cache)
archive.unlink(missing_ok=True)
def _system_binary_matches(version: str) -> Path | None:
candidate = shutil.which(_binary_name())
if candidate is None:
return None
try:
result = subprocess.run(
[candidate, "--version"],
check=False,
capture_output=True,
text=True,
timeout=5,
)
except (OSError, subprocess.TimeoutExpired):
return None
if result.returncode != 0:
return None
last_token = (result.stdout.strip().split() or [""])[-1].lstrip("v")
if last_token != version:
return None
return Path(candidate)
def _resolve_binary() -> Path:
version = _version()
system_binary = _system_binary_matches(version)
if system_binary is not None:
return system_binary
system, machine = _detect_platform()
asset_name, fmt = _asset_name_for(system, machine)
cache = _cache_dir(version)
target = asset_name.split(".", maxsplit=1)[0]
binary = cache / target / _binary_name()
if not binary.is_file():
_download_and_extract(version, asset_name, fmt, cache)
if not os.access(binary, os.X_OK):
binary.chmod(binary.stat().st_mode | 0o111)
return binary
def main() -> None:
binary = _resolve_binary()
args = sys.argv[1:]
os.execv(str(binary), [str(binary), *args])
if __name__ == "__main__":
main()