from __future__ import annotations
import argparse
import json
import shutil
import subprocess
import sys
import tempfile
import time
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any, Dict, Sequence
ROOT = Path(__file__).resolve().parents[1]
def parse_args(argv: Sequence[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Write the Iridium service lifecycle evidence artifacts"
)
parser.add_argument("--report-dir", default="artifacts")
parser.add_argument("--report-prefix", default="service_lifecycle")
parser.add_argument("--listen", default="127.0.0.1:7001")
parser.add_argument("--telemetry-endpoint", default="stdout")
parser.add_argument("--tls", default="operator-optional")
parser.add_argument("--admin-token", default="local-dev")
parser.add_argument("--query", default="MATCH (n) RETURN n LIMIT 1")
parser.add_argument("--max-requests", type=int, default=6)
parser.add_argument(
"--binary",
default=str(ROOT / "target" / "debug" / "ir"),
help="Path to the Iridium CLI binary",
)
return parser.parse_args(argv)
def run_json(command: list[str]) -> Dict[str, Any]:
completed = subprocess.run(
command,
cwd=ROOT,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True,
)
return json.loads(completed.stdout)
def run_command(command: list[str]) -> None:
subprocess.run(
command,
cwd=ROOT,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True,
)
def http_get(url: str, token: str | None = None) -> Dict[str, Any]:
request = urllib.request.Request(url, method="GET")
if token:
request.add_header("Authorization", f"Bearer {token}")
with urllib.request.urlopen(request, timeout=2) as response:
body = response.read().decode("utf-8")
content_type = response.headers.get("Content-Type", "")
payload: Any = body
if "application/json" in content_type:
payload = json.loads(body)
return {
"status": response.status,
"content_type": content_type,
"body": payload,
}
def wait_for_livez(base_url: str, attempts: int = 20) -> None:
for _ in range(attempts):
try:
response = http_get(f"{base_url}/livez")
if response["status"] == 200:
return
except (urllib.error.URLError, ConnectionError, TimeoutError):
pass
time.sleep(0.1)
raise RuntimeError("service failed to become ready for lifecycle report")
def write_markdown(
path: Path,
payload: Dict[str, Any],
checks: Dict[str, Dict[str, Any]],
) -> None:
md_lines = [
"# Service Lifecycle Report",
"",
f"- schema: `{payload['schema']}`",
f"- profile_id: `{payload['service']['profile_id']}`",
f"- listen_address: `{payload['operator_inputs']['listen']}`",
f"- telemetry_endpoint: `{payload['operator_inputs']['telemetry_endpoint']}`",
f"- tls: `{payload['operator_inputs']['tls']}`",
f"- admin_token_configured: `{str(payload['operator_inputs']['admin_token_configured']).lower()}`",
f"- lifecycle_stop_requested: `{str(payload['lifecycle']['stop_requested']).lower()}`",
"",
"## Checks",
]
for name, result in checks.items():
md_lines.append(
f"- `{name}`: status={result['status']} content_type=`{result['content_type']}`"
)
md_lines.extend(
[
"",
"## Query Fixture",
f"- seeded_node_id: `{payload['query_fixture']['seeded_node_id']}`",
f"- query: `{payload['query_fixture']['query']}`",
f"- row_count: `{payload['query_fixture']['row_count']}`",
"",
"## Validation Warnings",
]
)
warnings = payload["validation"]["warnings"]
if warnings:
for warning in warnings:
md_lines.append(f"- {warning}")
else:
md_lines.append("- none")
path.write_text("\n".join(md_lines) + "\n", encoding="utf-8")
def main(argv: Sequence[str]) -> int:
args = parse_args(argv)
report_dir = Path(args.report_dir)
report_dir.mkdir(parents=True, exist_ok=True)
binary = str(Path(args.binary))
base_url = f"http://{args.listen}"
service_report = run_json([binary, "service-report", "--listen", args.listen])
validation_report = run_json(
[
binary,
"service-validate",
"--listen",
args.listen,
"--telemetry-endpoint",
args.telemetry_endpoint,
"--tls",
args.tls,
"--admin-token",
args.admin_token,
"--max-requests",
str(args.max_requests),
]
)
temp_root = Path(tempfile.mkdtemp(prefix="iridium-service-lifecycle-"))
data_dir = temp_root / "data"
data_dir.mkdir(parents=True, exist_ok=True)
service_process: subprocess.Popen[str] | None = None
try:
run_command(
[
binary,
"--data",
str(data_dir),
"ingest-node",
"1",
"1",
"2,3",
]
)
service_process = subprocess.Popen(
[
binary,
"--data",
str(data_dir),
"service-serve",
"--listen",
args.listen,
"--telemetry-endpoint",
args.telemetry_endpoint,
"--tls",
args.tls,
"--admin-token",
args.admin_token,
"--max-requests",
str(args.max_requests + 1),
],
cwd=ROOT,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
wait_for_livez(base_url)
encoded_query = urllib.parse.quote_plus(args.query)
checks = {
"livez": http_get(f"{base_url}/livez"),
"readyz": http_get(f"{base_url}/readyz"),
"metrics": http_get(f"{base_url}/metrics"),
"query": http_get(f"{base_url}/v1/query?q={encoded_query}"),
"admin_status": http_get(f"{base_url}/admin/status", args.admin_token),
"admin_lifecycle_stop": http_get(
f"{base_url}/admin/lifecycle?action=stop",
args.admin_token,
),
}
if service_process.wait(timeout=3) != 0:
stderr = service_process.stderr.read() if service_process.stderr else ""
raise RuntimeError(f"service-serve exited nonzero: {stderr}")
query_rows = checks["query"]["body"].get("rows", [])
payload = {
"schema": "iridium.service-lifecycle-report.v1",
"service": service_report,
"validation": validation_report,
"operator_inputs": {
"listen": args.listen,
"telemetry_endpoint": args.telemetry_endpoint,
"tls": args.tls,
"admin_token_configured": bool(args.admin_token),
"max_requests": args.max_requests + 1,
},
"lifecycle": {
"livez_status": checks["livez"]["status"],
"readyz_status": checks["readyz"]["status"],
"admin_status": checks["admin_status"]["status"],
"stop_status": checks["admin_lifecycle_stop"]["status"],
"stop_requested": True,
},
"query_fixture": {
"seeded_node_id": 1,
"query": args.query,
"row_count": len(query_rows),
},
"checks": checks,
}
json_path = report_dir / f"{args.report_prefix}_report.json"
md_path = report_dir / f"{args.report_prefix}_report.md"
json_path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
write_markdown(md_path, payload, checks)
print(f"wrote: {json_path}")
print(f"wrote: {md_path}")
return 0
finally:
if service_process and service_process.poll() is None:
service_process.terminate()
try:
service_process.wait(timeout=2)
except subprocess.TimeoutExpired:
service_process.kill()
service_process.wait(timeout=2)
shutil.rmtree(temp_root, ignore_errors=True)
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))