from __future__ import annotations
import hashlib
import json
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Protocol, Tuple, Union
BASE58_ALPHABET = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"
MULTIBASE_BASE58BTC = "z"
MULTICODEC_MLDSA65 = bytes([0x13, 0x09])
MLDSA_PUBLIC_KEY_SIZE = 1952
class IdentityError(Exception):
pass
def base58_encode(data: bytes) -> str:
num = int.from_bytes(data, "big")
result = []
while num > 0:
num, remainder = divmod(num, 58)
result.append(BASE58_ALPHABET[remainder])
for byte in data:
if byte == 0:
result.append(BASE58_ALPHABET[0])
else:
break
return "".join(reversed(result))
def base58_decode(s: str) -> bytes:
num = 0
for char in s:
num = num * 58 + BASE58_ALPHABET.index(char)
result = []
while num > 0:
num, remainder = divmod(num, 256)
result.append(remainder)
for char in s:
if char == BASE58_ALPHABET[0]:
result.append(0)
else:
break
return bytes(reversed(result))
class DidMethod(Enum):
LUX = "lux"
KEY = "key"
WEB = "web"
class VerificationMethodType(Enum):
JSON_WEB_KEY_2020 = "JsonWebKey2020"
MULTIKEY = "Multikey"
ML_DSA_65_VERIFICATION_KEY_2024 = "MlDsa65VerificationKey2024"
class ServiceType(Enum):
ZAP_AGENT = "ZapAgent"
DID_COMM_MESSAGING = "DIDCommMessaging"
LINKED_DOMAINS = "LinkedDomains"
CREDENTIAL_REGISTRY = "CredentialRegistry"
@dataclass
class ServiceEndpoint:
uri: str
accept: Optional[List[str]] = None
routing_keys: Optional[List[str]] = None
def to_dict(self) -> Union[str, Dict[str, Any]]:
if self.accept is None and self.routing_keys is None:
return self.uri
result: Dict[str, Any] = {"uri": self.uri}
if self.accept:
result["accept"] = self.accept
if self.routing_keys:
result["routingKeys"] = self.routing_keys
return result
@dataclass
class VerificationMethod:
id: str
type: VerificationMethodType
controller: str
public_key_multibase: Optional[str] = None
public_key_jwk: Optional[Dict[str, Any]] = None
blockchain_account_id: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
result = {
"id": self.id,
"type": self.type.value,
"controller": self.controller,
}
if self.public_key_multibase:
result["publicKeyMultibase"] = self.public_key_multibase
if self.public_key_jwk:
result["publicKeyJwk"] = self.public_key_jwk
if self.blockchain_account_id:
result["blockchainAccountId"] = self.blockchain_account_id
return result
@dataclass
class Service:
id: str
type: ServiceType
service_endpoint: ServiceEndpoint
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"type": self.type.value,
"serviceEndpoint": self.service_endpoint.to_dict(),
}
@dataclass
class DidDocument:
id: str
context: List[str] = field(default_factory=lambda: [
"https://www.w3.org/ns/did/v1",
"https://w3id.org/security/suites/jws-2020/v1",
])
controller: Optional[str] = None
verification_method: List[VerificationMethod] = field(default_factory=list)
authentication: List[str] = field(default_factory=list)
assertion_method: List[str] = field(default_factory=list)
key_agreement: List[str] = field(default_factory=list)
capability_invocation: List[str] = field(default_factory=list)
capability_delegation: List[str] = field(default_factory=list)
service: List[Service] = field(default_factory=list)
def primary_verification_method(self) -> Optional[VerificationMethod]:
return self.verification_method[0] if self.verification_method else None
def get_verification_method(self, id: str) -> Optional[VerificationMethod]:
for vm in self.verification_method:
if vm.id == id:
return vm
return None
def get_service(self, id: str) -> Optional[Service]:
for svc in self.service:
if svc.id == id:
return svc
return None
def to_dict(self) -> Dict[str, Any]:
result: Dict[str, Any] = {
"@context": self.context,
"id": self.id,
}
if self.controller:
result["controller"] = self.controller
if self.verification_method:
result["verificationMethod"] = [vm.to_dict() for vm in self.verification_method]
if self.authentication:
result["authentication"] = self.authentication
if self.assertion_method:
result["assertionMethod"] = self.assertion_method
if self.key_agreement:
result["keyAgreement"] = self.key_agreement
if self.capability_invocation:
result["capabilityInvocation"] = self.capability_invocation
if self.capability_delegation:
result["capabilityDelegation"] = self.capability_delegation
if self.service:
result["service"] = [svc.to_dict() for svc in self.service]
return result
def to_json(self, indent: int = 2) -> str:
return json.dumps(self.to_dict(), indent=indent)
@classmethod
def from_json(cls, json_str: str) -> "DidDocument":
data = json.loads(json_str)
return cls.from_dict(data)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "DidDocument":
verification_methods = []
for vm_data in data.get("verificationMethod", []):
verification_methods.append(VerificationMethod(
id=vm_data["id"],
type=VerificationMethodType(vm_data["type"]),
controller=vm_data["controller"],
public_key_multibase=vm_data.get("publicKeyMultibase"),
public_key_jwk=vm_data.get("publicKeyJwk"),
blockchain_account_id=vm_data.get("blockchainAccountId"),
))
services = []
for svc_data in data.get("service", []):
endpoint_data = svc_data["serviceEndpoint"]
if isinstance(endpoint_data, str):
endpoint = ServiceEndpoint(uri=endpoint_data)
else:
endpoint = ServiceEndpoint(
uri=endpoint_data["uri"],
accept=endpoint_data.get("accept"),
routing_keys=endpoint_data.get("routingKeys"),
)
services.append(Service(
id=svc_data["id"],
type=ServiceType(svc_data["type"]),
service_endpoint=endpoint,
))
return cls(
id=data["id"],
context=data.get("@context", []),
controller=data.get("controller"),
verification_method=verification_methods,
authentication=data.get("authentication", []),
assertion_method=data.get("assertionMethod", []),
key_agreement=data.get("keyAgreement", []),
capability_invocation=data.get("capabilityInvocation", []),
capability_delegation=data.get("capabilityDelegation", []),
service=services,
)
@dataclass
class Did:
method: DidMethod
id: str
def uri(self) -> str:
return f"did:{self.method.value}:{self.id}"
def __str__(self) -> str:
return self.uri()
def __hash__(self) -> int:
return hash((self.method, self.id))
def extract_key_material(self) -> bytes:
if not self.id:
raise IdentityError("empty DID identifier")
if not self.id.startswith(MULTIBASE_BASE58BTC):
raise IdentityError(
f"unsupported multibase encoding: expected '{MULTIBASE_BASE58BTC}', "
f"got '{self.id[0]}'"
)
try:
decoded = base58_decode(self.id[1:])
except Exception as e:
raise IdentityError(f"invalid base58btc encoding: {e}")
if len(decoded) < 2:
raise IdentityError("DID identifier too short")
if decoded[:2] == MULTICODEC_MLDSA65:
return decoded[2:]
return decoded
def document(self) -> DidDocument:
did_uri = self.uri()
if self.method in (DidMethod.KEY, DidMethod.LUX):
key_material = self.extract_key_material()
blockchain_account_id = None
if self.method == DidMethod.LUX:
blockchain_account_id = f"lux:{key_material[:20].hex()}"
verification_method = VerificationMethod(
id=f"{did_uri}#keys-1",
type=VerificationMethodType.JSON_WEB_KEY_2020,
controller=did_uri,
public_key_multibase=self.id,
blockchain_account_id=blockchain_account_id,
)
else:
verification_method = VerificationMethod(
id=f"{did_uri}#keys-1",
type=VerificationMethodType.JSON_WEB_KEY_2020,
controller=did_uri,
)
service = Service(
id=f"{did_uri}#zap-agent",
type=ServiceType.ZAP_AGENT,
service_endpoint=ServiceEndpoint(uri=f"zap://{self.id}"),
)
return DidDocument(
id=did_uri,
verification_method=[verification_method],
authentication=[f"{did_uri}#keys-1"],
assertion_method=[f"{did_uri}#keys-1"],
capability_invocation=[f"{did_uri}#keys-1"],
service=[service],
)
def parse_did(s: str) -> Did:
if not s.startswith("did:"):
raise IdentityError(f"invalid DID: must start with 'did:', got '{s}'")
rest = s[4:] parts = rest.split(":", 1)
if len(parts) != 2:
raise IdentityError(f"invalid DID format: expected 'did:method:id', got '{s}'")
method_str, did_id = parts
try:
method = DidMethod(method_str)
except ValueError:
raise IdentityError(f"unknown DID method: {method_str}")
if not did_id:
raise IdentityError("DID identifier cannot be empty")
return Did(method=method, id=did_id)
def create_did_from_key(public_key: bytes, method: DidMethod = DidMethod.KEY) -> Did:
if len(public_key) != MLDSA_PUBLIC_KEY_SIZE:
raise IdentityError(
f"invalid ML-DSA public key size: expected {MLDSA_PUBLIC_KEY_SIZE}, "
f"got {len(public_key)}"
)
prefixed = MULTICODEC_MLDSA65 + public_key
encoded = base58_encode(prefixed)
did_id = f"{MULTIBASE_BASE58BTC}{encoded}"
return Did(method=method, id=did_id)
def create_did_from_web(domain: str, path: Optional[str] = None) -> Did:
if not domain:
raise IdentityError("domain cannot be empty")
if "/" in domain or ":" in domain:
raise IdentityError(f"invalid domain for did:web: {domain}")
if path:
path_parts = path.replace("/", ":")
did_id = f"{domain}:{path_parts}"
else:
did_id = domain
return Did(method=DidMethod.WEB, id=did_id)
class StakeRegistry(Protocol):
def get_stake(self, did: Did) -> int:
...
def set_stake(self, did: Did, amount: int) -> None:
...
def total_stake(self) -> int:
...
class InMemoryStakeRegistry:
def __init__(self) -> None:
self._stakes: Dict[str, int] = {}
def get_stake(self, did: Did) -> int:
return self._stakes.get(did.uri(), 0)
def set_stake(self, did: Did, amount: int) -> None:
self._stakes[did.uri()] = amount
def total_stake(self) -> int:
return sum(self._stakes.values())
def has_sufficient_stake(self, did: Did, minimum: int) -> bool:
return self.get_stake(did) >= minimum
def stake_weight(self, did: Did) -> float:
stake = self.get_stake(did)
total = self.total_stake()
if total == 0:
return 0.0
return stake / total
@dataclass
class NodeIdentity:
did: Did
public_key: bytes
stake: Optional[int] = None
stake_registry: Optional[str] = None
_signer: Optional[Any] = field(default=None, repr=False)
def can_sign(self) -> bool:
return self._signer is not None
def sign(self, message: bytes) -> bytes:
if self._signer is None:
raise IdentityError("no private key available for signing")
return self._signer.sign(message)
def verify(self, message: bytes, signature: bytes) -> bool:
try:
from .crypto import PQSignature, PQ_AVAILABLE
if not PQ_AVAILABLE:
raise IdentityError("verification requires pqcrypto")
if self._signer is not None:
return self._signer.verify(message, signature)
else:
verifier = PQSignature.from_public_key(self.public_key)
return verifier.verify(message, signature)
except ImportError:
raise IdentityError("verification requires pqcrypto")
def document(self) -> DidDocument:
return self.did.document()
def with_stake(self, amount: int) -> "NodeIdentity":
self.stake = amount
return self
def with_registry(self, registry: str) -> "NodeIdentity":
self.stake_registry = registry
return self
def generate_identity(method: DidMethod = DidMethod.LUX) -> NodeIdentity:
try:
from .crypto import PQSignature, PQ_AVAILABLE
if not PQ_AVAILABLE:
raise IdentityError("identity generation requires pqcrypto")
signer = PQSignature.generate()
public_key = signer.public_key
did = create_did_from_key(public_key, method=method)
return NodeIdentity(
did=did,
public_key=public_key,
_signer=signer,
)
except ImportError:
raise IdentityError("identity generation requires pqcrypto")
__all__ = [
"IdentityError",
"DidMethod",
"VerificationMethodType",
"ServiceType",
"ServiceEndpoint",
"VerificationMethod",
"Service",
"DidDocument",
"Did",
"parse_did",
"create_did_from_key",
"create_did_from_web",
"StakeRegistry",
"InMemoryStakeRegistry",
"NodeIdentity",
"generate_identity",
"MLDSA_PUBLIC_KEY_SIZE",
]