from __future__ import annotations
import functools
import os
from dataclasses import dataclass
from typing import Any, Callable, Optional
import httpx
_PROTOCOL_TAG: bytes = bytes([
0x44, 0x79, 0x6f, 0x6c, 0x6f, 0x50, 0x61, 0x73, 0x73, 0x70, 0x6f, 0x72, 0x74,
0x20, 0x76, 0x32, 0x2e, 0x38, 0x2e, 0x30,
0x7c, 0x64, 0x79, 0x6f, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x69, 0x61, 0x6e,
])
__all__ = [
"PassportClient",
"PassportError",
"PassportReceipt",
"a1_guard",
]
class PassportError(Exception):
def __init__(
self,
message: str,
error_code: str = "PASSPORT_ERROR",
http_status: Optional[int] = None,
*,
status: Optional[int] = None,
) -> None:
super().__init__(message)
self.error_code = error_code
_resolved = http_status if http_status is not None else status
self.http_status = _resolved
self.status = _resolved
@dataclass(frozen=True)
class PassportReceipt:
passport_namespace: str
fingerprint_hex: str
capability_mask_hex: str
narrowing_commitment_hex: str
chain_depth: int
@classmethod
def _from_response(cls, raw: dict) -> "PassportReceipt":
data = raw.get("receipt", raw)
try:
return cls(
passport_namespace=data["passport_namespace"],
fingerprint_hex=data["fingerprint_hex"],
capability_mask_hex=data["capability_mask_hex"],
narrowing_commitment_hex=data["narrowing_commitment_hex"],
chain_depth=data["chain_depth"],
)
except KeyError as exc:
raise PassportError(
f"malformed receipt: missing field {exc}",
error_code="MALFORMED_RECEIPT",
) from exc
class PassportClient:
def __init__(
self,
base_url: str,
*,
timeout: float = 10.0,
headers: Optional[dict[str, str]] = None,
) -> None:
self._base_url = base_url.rstrip("/")
self._client = httpx.Client(
timeout=timeout,
headers=headers or {},
)
self._async_client: Optional[httpx.AsyncClient] = None
def _async(self) -> httpx.AsyncClient:
if self._async_client is None:
self._async_client = httpx.AsyncClient(
base_url=self._base_url,
timeout=self._client.timeout,
headers=dict(self._client.headers),
)
return self._async_client
def authorize(
self,
*,
signed_chain: dict,
intent_name: str,
executor_pk_hex: str,
intent_params: Optional[dict] = None,
) -> PassportReceipt:
payload = {
"chain": signed_chain,
"intent_name": intent_name,
"executor_pk_hex": executor_pk_hex,
"intent_params": intent_params or {},
}
resp = self._client.post(f"{self._base_url}/v1/passport/authorize", json=payload)
return self._parse_response(resp)
async def authorize_async(
self,
*,
signed_chain: dict,
intent_name: str,
executor_pk_hex: str,
intent_params: Optional[dict] = None,
) -> PassportReceipt:
payload = {
"chain": signed_chain,
"intent_name": intent_name,
"executor_pk_hex": executor_pk_hex,
"intent_params": intent_params or {},
}
resp = await self._async().post("/v1/passport/authorize", json=payload)
return self._parse_response(resp)
@staticmethod
def _parse_response(resp: httpx.Response) -> PassportReceipt:
if resp.status_code != 200:
try:
body = resp.json()
msg = body.get("error", resp.text)
code = body.get("error_code", "AUTHORIZATION_FAILED")
except Exception:
msg = resp.text
code = "AUTHORIZATION_FAILED"
raise PassportError(msg, error_code=code, http_status=resp.status_code)
data = resp.json()
return PassportReceipt._from_response(data)
def close(self) -> None:
self._client.close()
if self._async_client is not None:
import asyncio
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(self._async_client.aclose())
else:
loop.run_until_complete(self._async_client.aclose())
except RuntimeError:
pass
def a1_guard(
*,
client: PassportClient,
capability: str,
chain_kwarg: str = "signed_chain",
executor_kwarg: str = "executor_pk_hex",
) -> Callable:
def decorator(fn: Callable) -> Callable:
import asyncio
import inspect
if inspect.iscoroutinefunction(fn):
@functools.wraps(fn)
async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
chain = kwargs.get(chain_kwarg)
executor_pk = kwargs.get(executor_kwarg, "")
if chain is None:
raise PassportError(
f"missing required kwarg '{chain_kwarg}'",
error_code="MISSING_CHAIN",
)
await client.authorize_async(
signed_chain=chain,
intent_name=capability,
executor_pk_hex=executor_pk,
)
return await fn(*args, **kwargs)
return async_wrapper
else:
@functools.wraps(fn)
def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
chain = kwargs.get(chain_kwarg)
executor_pk = kwargs.get(executor_kwarg, "")
if chain is None:
raise PassportError(
f"missing required kwarg '{chain_kwarg}'",
error_code="MISSING_CHAIN",
)
client.authorize(
signed_chain=chain,
intent_name=capability,
executor_pk_hex=executor_pk,
)
return fn(*args, **kwargs)
return sync_wrapper
return decorator