from __future__ import annotations
import hashlib
import hmac
import os
import secrets
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Tuple
try:
from pqcrypto.kem.kyber768 import (
generate_keypair as mlkem_keypair,
encrypt as mlkem_encapsulate,
decrypt as mlkem_decapsulate,
PUBLIC_KEY_SIZE as MLKEM_PUBLIC_KEY_SIZE,
CIPHERTEXT_SIZE as MLKEM_CIPHERTEXT_SIZE,
)
from pqcrypto.sign.dilithium3 import (
generate_keypair as mldsa_keypair,
sign as mldsa_sign,
verify as mldsa_verify,
PUBLIC_KEY_SIZE as MLDSA_PUBLIC_KEY_SIZE,
SIGNATURE_SIZE as MLDSA_SIGNATURE_SIZE,
)
PQ_AVAILABLE = True
except ImportError:
PQ_AVAILABLE = False
MLKEM_PUBLIC_KEY_SIZE = 1184
MLKEM_CIPHERTEXT_SIZE = 1088
MLDSA_PUBLIC_KEY_SIZE = 1952
MLDSA_SIGNATURE_SIZE = 3293
try:
from cryptography.hazmat.primitives.asymmetric.x25519 import (
X25519PrivateKey,
X25519PublicKey,
)
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
X25519_AVAILABLE = True
except ImportError:
X25519_AVAILABLE = False
X25519_PUBLIC_KEY_SIZE = 32
SHARED_SECRET_SIZE = 32
HYBRID_SHARED_SECRET_SIZE = 32
class CryptoError(Exception):
pass
class PQKeyExchange:
def __init__(self, public_key: bytes, secret_key: Optional[bytes] = None):
self._public_key = public_key
self._secret_key = secret_key
@classmethod
def generate(cls) -> "PQKeyExchange":
if not PQ_AVAILABLE:
raise CryptoError("pqcrypto not available - install with: pip install pqcrypto")
pk, sk = mlkem_keypair()
return cls(pk, sk)
@classmethod
def from_public_key(cls, public_key: bytes) -> "PQKeyExchange":
if len(public_key) != MLKEM_PUBLIC_KEY_SIZE:
raise CryptoError(
f"Invalid ML-KEM public key size: expected {MLKEM_PUBLIC_KEY_SIZE}, "
f"got {len(public_key)}"
)
return cls(public_key, None)
@property
def public_key(self) -> bytes:
return self._public_key
def encapsulate(self, recipient_pk: bytes) -> Tuple[bytes, bytes]:
if not PQ_AVAILABLE:
raise CryptoError("pqcrypto not available")
if len(recipient_pk) != MLKEM_PUBLIC_KEY_SIZE:
raise CryptoError(
f"Invalid recipient public key size: expected {MLKEM_PUBLIC_KEY_SIZE}, "
f"got {len(recipient_pk)}"
)
ciphertext, shared_secret = mlkem_encapsulate(recipient_pk)
return ciphertext, shared_secret
def decapsulate(self, ciphertext: bytes) -> bytes:
if not PQ_AVAILABLE:
raise CryptoError("pqcrypto not available")
if self._secret_key is None:
raise CryptoError("No secret key available for decapsulation")
if len(ciphertext) != MLKEM_CIPHERTEXT_SIZE:
raise CryptoError(
f"Invalid ML-KEM ciphertext size: expected {MLKEM_CIPHERTEXT_SIZE}, "
f"got {len(ciphertext)}"
)
return mlkem_decapsulate(ciphertext, self._secret_key)
class PQSignature:
def __init__(self, public_key: bytes, secret_key: Optional[bytes] = None):
self._public_key = public_key
self._secret_key = secret_key
@classmethod
def generate(cls) -> "PQSignature":
if not PQ_AVAILABLE:
raise CryptoError("pqcrypto not available - install with: pip install pqcrypto")
pk, sk = mldsa_keypair()
return cls(pk, sk)
@classmethod
def from_public_key(cls, public_key: bytes) -> "PQSignature":
if len(public_key) != MLDSA_PUBLIC_KEY_SIZE:
raise CryptoError(
f"Invalid ML-DSA public key size: expected {MLDSA_PUBLIC_KEY_SIZE}, "
f"got {len(public_key)}"
)
return cls(public_key, None)
@property
def public_key(self) -> bytes:
return self._public_key
def sign(self, message: bytes) -> bytes:
if not PQ_AVAILABLE:
raise CryptoError("pqcrypto not available")
if self._secret_key is None:
raise CryptoError("No secret key available for signing")
return mldsa_sign(message, self._secret_key)
def verify(self, message: bytes, signature: bytes) -> bool:
if not PQ_AVAILABLE:
raise CryptoError("pqcrypto not available")
if len(signature) != MLDSA_SIGNATURE_SIZE:
raise CryptoError(
f"Invalid ML-DSA signature size: expected {MLDSA_SIGNATURE_SIZE}, "
f"got {len(signature)}"
)
try:
mldsa_verify(message, signature, self._public_key)
return True
except Exception:
raise CryptoError("Signature verification failed")
@dataclass
class HybridInitiatorData:
x25519_public_key: bytes
mlkem_public_key: bytes
@dataclass
class HybridResponderData:
x25519_public_key: bytes
mlkem_ciphertext: bytes
class HandshakeRole(Enum):
INITIATOR = "initiator"
RESPONDER = "responder"
class HybridHandshake:
def __init__(
self,
x25519_private: Optional[bytes],
x25519_public: bytes,
mlkem: PQKeyExchange,
role: HandshakeRole,
):
self._x25519_private = x25519_private
self._x25519_public = x25519_public
self._mlkem = mlkem
self._role = role
@classmethod
def initiate(cls) -> "HybridHandshake":
if not X25519_AVAILABLE:
raise CryptoError(
"cryptography not available - install with: pip install cryptography"
)
if not PQ_AVAILABLE:
raise CryptoError(
"pqcrypto not available - install with: pip install pqcrypto"
)
x25519_private = X25519PrivateKey.generate()
x25519_public = x25519_private.public_key().public_bytes_raw()
x25519_private_bytes = x25519_private.private_bytes_raw()
mlkem = PQKeyExchange.generate()
return cls(
x25519_private=x25519_private_bytes,
x25519_public=x25519_public,
mlkem=mlkem,
role=HandshakeRole.INITIATOR,
)
@property
def public_data(self) -> HybridInitiatorData:
return HybridInitiatorData(
x25519_public_key=self._x25519_public,
mlkem_public_key=self._mlkem.public_key,
)
@classmethod
def respond(
cls, initiator_data: HybridInitiatorData
) -> Tuple["HybridHandshake", HybridResponderData]:
if not X25519_AVAILABLE:
raise CryptoError("cryptography not available")
if not PQ_AVAILABLE:
raise CryptoError("pqcrypto not available")
if len(initiator_data.x25519_public_key) != X25519_PUBLIC_KEY_SIZE:
raise CryptoError(
f"Invalid X25519 public key size: expected {X25519_PUBLIC_KEY_SIZE}, "
f"got {len(initiator_data.x25519_public_key)}"
)
if len(initiator_data.mlkem_public_key) != MLKEM_PUBLIC_KEY_SIZE:
raise CryptoError(
f"Invalid ML-KEM public key size: expected {MLKEM_PUBLIC_KEY_SIZE}, "
f"got {len(initiator_data.mlkem_public_key)}"
)
x25519_private = X25519PrivateKey.generate()
x25519_public = x25519_private.public_key().public_bytes_raw()
x25519_private_bytes = x25519_private.private_bytes_raw()
mlkem = PQKeyExchange.generate()
mlkem_ciphertext, _ = mlkem.encapsulate(initiator_data.mlkem_public_key)
response = HybridResponderData(
x25519_public_key=x25519_public,
mlkem_ciphertext=mlkem_ciphertext,
)
handshake = cls(
x25519_private=x25519_private_bytes,
x25519_public=x25519_public,
mlkem=mlkem,
role=HandshakeRole.RESPONDER,
)
return handshake, response
def finalize(self, responder_data: HybridResponderData) -> bytes:
if self._role != HandshakeRole.INITIATOR:
raise CryptoError("finalize() can only be called by initiator")
if self._x25519_private is None:
raise CryptoError("X25519 private key not available")
x25519_private = X25519PrivateKey.from_private_bytes(self._x25519_private)
peer_x25519_public = X25519PublicKey.from_public_bytes(
responder_data.x25519_public_key
)
x25519_shared = x25519_private.exchange(peer_x25519_public)
mlkem_shared = self._mlkem.decapsulate(responder_data.mlkem_ciphertext)
self._x25519_private = None
return self._derive_hybrid_secret(x25519_shared, mlkem_shared)
def complete(
self,
initiator_data: HybridInitiatorData,
mlkem_shared: Optional[bytes] = None,
) -> bytes:
if self._role != HandshakeRole.RESPONDER:
raise CryptoError("complete() can only be called by responder")
if self._x25519_private is None:
raise CryptoError("X25519 private key not available")
x25519_private = X25519PrivateKey.from_private_bytes(self._x25519_private)
peer_x25519_public = X25519PublicKey.from_public_bytes(
initiator_data.x25519_public_key
)
x25519_shared = x25519_private.exchange(peer_x25519_public)
if mlkem_shared is None:
_, mlkem_shared = self._mlkem.encapsulate(initiator_data.mlkem_public_key)
self._x25519_private = None
return self._derive_hybrid_secret(x25519_shared, mlkem_shared)
@staticmethod
def _derive_hybrid_secret(x25519_shared: bytes, mlkem_shared: bytes) -> bytes:
ikm = x25519_shared + mlkem_shared
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=HYBRID_SHARED_SECRET_SIZE,
salt=b"ZAP-HYBRID-HANDSHAKE-v1",
info=b"shared-secret",
)
return hkdf.derive(ikm)
def hybrid_handshake() -> Tuple[bytes, bytes]:
initiator = HybridHandshake.initiate()
init_data = initiator.public_data
responder, resp_data = HybridHandshake.respond(init_data)
_, mlkem_shared = PQKeyExchange.generate().encapsulate(init_data.mlkem_public_key)
initiator_secret = initiator.finalize(resp_data)
responder_secret = responder.complete(init_data, mlkem_shared)
return initiator_secret, responder_secret
__all__ = [
"PQ_AVAILABLE",
"X25519_AVAILABLE",
"MLKEM_PUBLIC_KEY_SIZE",
"MLKEM_CIPHERTEXT_SIZE",
"MLDSA_PUBLIC_KEY_SIZE",
"MLDSA_SIGNATURE_SIZE",
"X25519_PUBLIC_KEY_SIZE",
"SHARED_SECRET_SIZE",
"HYBRID_SHARED_SECRET_SIZE",
"CryptoError",
"PQKeyExchange",
"PQSignature",
"HybridInitiatorData",
"HybridResponderData",
"HybridHandshake",
"hybrid_handshake",
]