power_house 0.3.6

Deterministic verification and provenance system for portable computational identities using .pha artifacts and Rootprint graphs.
Documentation
#!/usr/bin/env python3

from __future__ import annotations

from datetime import datetime, timedelta, timezone
import importlib.util
import json
from pathlib import Path
import tempfile


ROOT = Path(__file__).resolve().parents[1]
STATUS_API = ROOT / "infra" / "monitoring" / "status_api.py"


def load_module():
    spec = importlib.util.spec_from_file_location("powerhouse_status_api", STATUS_API)
    module = importlib.util.module_from_spec(spec)
    assert spec.loader is not None
    spec.loader.exec_module(module)
    return module


def registry_state(generated_at: str) -> dict:
    validators = [
        {
            "node_id": f"validator-{index}",
            "healthy": True,
            "identity_verified": True,
            "system_metrics_reachable": True,
            "peer_links": 3,
        }
        for index in range(1, 5)
    ]
    return {
        "schema": "power-house-validator-registry-health-v1",
        "chain_id": 177155,
        "generated_at": generated_at,
        "registry_verified": True,
        "validators_total": 4,
        "validators_healthy": 4,
        "peer_link_observations": 12,
        "validators": validators,
        "error": None,
    }


def main() -> None:
    prometheus = (ROOT / "infra" / "monitoring" / "prometheus.yml").read_text()
    assert "/etc/prometheus/file_sd/powerhouse-validators.json" in prometheus
    assert "/etc/prometheus/file_sd/powerhouse-systems.json" in prometheus
    assert "__NODE" not in prometheus
    deployment = (ROOT / "scripts" / "deploy_monitoring_stack.sh").read_text()
    assert "powerhouse-validator-registry.timer" in deployment
    assert "validator-registry.json" in deployment
    website = (ROOT / "publicpower" / "app.js").read_text()
    assert "validators_total) || 3" not in website

    module = load_module()
    now = datetime.now(timezone.utc)
    with tempfile.TemporaryDirectory(prefix="powerhouse-status-registry-") as temp:
        state_path = Path(temp) / "state.json"
        module.REGISTRY_STATE_PATH = str(state_path)
        module.query = lambda expression: (
            1.0 if "probe_success" in expression and "avg_over_time" not in expression else 0.999
        )
        module.fetch_json = lambda url, data=None, headers=None: {
            "data": {"startTime": (now - timedelta(days=2)).isoformat()}
        }
        module.rpc = lambda method: {
            "eth_chainId": hex(177155),
            "eth_blockNumber": hex(42),
            "web3_clientVersion": "power-house/test",
        }[method]

        state_path.write_text(json.dumps(registry_state(now.isoformat())))
        snapshot = module.snapshot()
        assert snapshot["status"] == "operational"
        assert snapshot["validators_healthy"] == 4
        assert snapshot["validators_total"] == 4
        assert snapshot["peer_connections"] == 12
        assert snapshot["validator_registry"] == {
            "fresh": True,
            "identity_verified": 4,
            "verified": True,
        }

        stale = now - timedelta(minutes=5)
        state_path.write_text(json.dumps(registry_state(stale.isoformat())))
        snapshot = module.snapshot()
        assert snapshot["status"] == "outage"
        assert snapshot["validator_registry"]["fresh"] is False

    print("test_status_registry: PASS")


if __name__ == "__main__":
    main()