import hashlib
import time
import synta
import synta.mtc as mtc
def _enc(fn):
enc = synta.Encoder(synta.Encoding.DER)
fn(enc)
return enc.finish()
def _seq(*parts):
enc = synta.Encoder(synta.Encoding.DER)
enc.encode_sequence(b"".join(parts))
return enc.finish()
def _explicit(n, content):
enc = synta.Encoder(synta.Encoding.DER)
enc.encode_explicit_tag(n, "Context", content)
return enc.finish()
def _oid(s):
return _enc(lambda e: e.encode_oid(synta.ObjectIdentifier(s)))
def _oct(b):
return _enc(lambda e: e.encode_octet_string(b))
def _bit(b):
return _enc(lambda e: e.encode_bit_string(synta.BitString(b, 0)))
def _gt(yr, mo, d, hr, mi, s):
return _enc(lambda e: e.encode_generalized_time(
synta.GeneralizedTime(yr, mo, d, hr, mi, s, None)
))
def _int(v):
return _enc(lambda e: e.encode_integer(v))
def _name(cn: str) -> bytes:
atav = _seq(
_enc(lambda e: e.encode_oid(synta.ObjectIdentifier("2.5.4.3")))
+ _enc(lambda e: e.encode_utf8_string(cn))
)
enc = synta.Encoder(synta.Encoding.DER)
enc.encode_set(atav) return _seq(enc.finish())
_NULL = _enc(lambda e: e.encode_null())
_SHA256_OID = "2.16.840.1.101.3.4.2.1"
_SHA256_ALG = _seq(_oid(_SHA256_OID), _NULL)
_EC_OID = "1.2.840.10045.2.1"
_P256_OID = "1.2.840.10045.3.1.7"
_EC_ALG = _seq(_oid(_EC_OID), _oid(_P256_OID))
_RSA_OID = "1.2.840.113549.1.1.1"
_RSA_ALG = _seq(_oid(_RSA_OID), _NULL)
_ECDSA_OID = "1.2.840.10045.4.3.2"
_ECDSA_ALG = _seq(_oid(_ECDSA_OID))
_LOG_SPKI = _seq(_RSA_ALG, _bit(b"\x01\x02\x03\x04" * 8))
_LOG_ID_DER = _seq(_SHA256_ALG, _LOG_SPKI)
_DUMMY_SIG = b"\xca\xfe\xba\xbe" * 8
_COSIGNER_OID = "1.3.6.1.4.1.44363.47.10.1"
def section(title: str) -> None:
print(f"\n{'─' * 60}\n{title}\n{'─' * 60}")
def _make_ec_spki(domain: str) -> bytes:
pk_bytes = b"\x04" + hashlib.sha256(domain.encode()).digest() * 2 return _seq(_EC_ALG, _bit(pk_bytes))
def _make_tbs_log_entry(domain: str) -> bytes:
spki_der = _make_ec_spki(domain)
spki_hash = hashlib.sha256(spki_der).digest() validity = _seq(_gt(2026, 1, 1, 0, 0, 0), _gt(2027, 1, 1, 0, 0, 0))
return _seq(
_name("Example CA"),
validity,
_name(domain),
_EC_ALG,
_oct(spki_hash),
)
def _make_tbs_cert(serial: int, domain: str) -> bytes:
spki_der = _make_ec_spki(domain)
validity = _seq(_gt(2026, 1, 1, 0, 0, 0), _gt(2027, 1, 1, 0, 0, 0))
version = _explicit(0, _int(2))
return _seq(
version,
_int(serial), _ECDSA_ALG, _name("Example CA"), validity,
_name(domain), spki_der, )
def demo_hash_algorithm() -> None:
section("1 — HashAlgorithm: name, output_size, oid")
alg = mtc.HashAlgorithm.Sha256
print(f" algorithm : {alg!r}")
print(f" name() : {alg.name()}")
print(f" output_size() : {alg.output_size()} bytes")
print(f" oid() : {alg.oid()}")
assert alg.name() == "SHA-256"
assert alg.output_size() == 32
assert alg.oid() == _SHA256_OID
assert alg == mtc.HashAlgorithm.Sha256
assert alg != mtc.HashAlgorithm.Sha384
print(f" Sha384 name: {mtc.HashAlgorithm.Sha384.name()} "
f"({mtc.HashAlgorithm.Sha384.output_size()} bytes)")
def demo_issuance_log() -> tuple[bytes, bytes, list[bytes], int]:
section("2 — IssuanceLogBuilder: build log, checkpoint, inclusion proofs")
builder = mtc.IssuanceLogBuilder()
builder.hash_algorithm(mtc.HashAlgorithm.Sha256)
builder.log_id(_LOG_ID_DER)
print(f" Initial tree_size (null entry only): {builder.tree_size()}")
assert builder.tree_size() == 1
tbs_entry_alice = _make_tbs_log_entry("alice.example.com")
tbs_entry_bob = _make_tbs_log_entry("bob.example.com")
builder.add_entry(tbs_entry_alice)
builder.add_entry(tbs_entry_bob)
print(f" After adding 2 entries, tree_size: {builder.tree_size()}")
assert builder.tree_size() == 3
leaf_hashes = builder.compute_leaf_hashes()
print(f" compute_leaf_hashes(): {len(leaf_hashes)} hashes, "
f"each {len(leaf_hashes[0])} bytes")
assert len(leaf_hashes) == 3
assert len(leaf_hashes[0]) == 32
root = builder.compute_root()
print(f" Merkle root: {root.hex()}")
assert len(root) == 32
unix_ts = 1_751_328_000 cp_der = builder.checkpoint_at_unix(unix_ts)
print(f" checkpoint_at_unix({unix_ts}): {len(cp_der)} bytes DER")
cp = mtc.Checkpoint.from_der(cp_der)
print(f" tree_size : {cp.tree_size}")
print(f" root_value : {cp.root_value.hex()}")
print(f" timestamp : {cp.timestamp}")
assert cp.tree_size == 3
assert cp.root_value == root
proof_alice = builder.generate_proof(1) proof_bob = builder.generate_proof(2) print(f" generate_proof(1) — alice: {len(proof_alice)} sibling hashes")
print(f" generate_proof(2) — bob: {len(proof_bob)} sibling hashes")
print(f" repr: {builder!r}")
return cp_der, root, leaf_hashes, builder.tree_size()
def demo_mtc_proof(leaf_hashes: list[bytes]) -> None:
section("3 — MtcProof: decode / encode round-trip")
inclusion_bytes = leaf_hashes[0] raw = (
b"\x00\x00" + b"\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x02" + len(inclusion_bytes).to_bytes(2, "big") + inclusion_bytes
+ b"\x00\x00" )
proof = mtc.MtcProof.decode(raw)
print(f" MtcProof.decode: {repr(proof)}")
print(f" start : {proof.start}")
print(f" end : {proof.end}")
print(f" inclusion_proof : {len(proof.inclusion_proof)} bytes")
print(f" signatures : {len(proof.signatures)} records")
assert proof.start == 0
assert proof.end == 2
assert len(proof.inclusion_proof) == 32
assert len(proof.signatures) == 0
re_encoded = proof.encode()
assert re_encoded == raw, "MtcProof encode/decode round-trip failed"
print(f" encode() round-trip: OK ({len(re_encoded)} bytes)")
def demo_serial_helpers() -> None:
section("4 — extract_leaf_index / extract_log_number (spec §6.1)")
log_number = 3
leaf_index = 42
serial = (log_number << 48) | leaf_index
extracted_leaf = mtc.extract_leaf_index(serial)
extracted_log = mtc.extract_log_number(serial)
print(f" serial = (log_number={log_number} << 48) | leaf_index={leaf_index}")
print(f" extract_leaf_index({serial}) = {extracted_leaf}")
print(f" extract_log_number({serial}) = {extracted_log}")
assert extracted_leaf == leaf_index
assert extracted_log == log_number
serial2 = 1
assert mtc.extract_leaf_index(serial2) == 1
assert mtc.extract_log_number(serial2) == 0
print(f" leaf_index of serial=1: {mtc.extract_leaf_index(serial2)}")
print(f" log_number of serial=1: {mtc.extract_log_number(serial2)}")
def demo_revoked_ranges() -> None:
section("5 — RevokedRanges: add, merge, is_revoked")
revoked = mtc.RevokedRanges()
assert revoked.is_empty()
revoked.add(10, 20)
revoked.add(30, 40)
print(f" After add(10,20) and add(30,40): {revoked.ranges()}")
assert len(revoked) == 2
assert revoked.is_revoked(15) is True
assert revoked.is_revoked(35) is True
assert revoked.is_revoked(5) is False
assert revoked.is_revoked(25) is False
revoked.add(18, 32)
print(f" After add(18,32) — overlapping merge: {revoked.ranges()}")
assert len(revoked) == 1
assert revoked.is_revoked(25) is True
assert revoked.is_revoked(9) is False
assert revoked.is_revoked(41) is False
assert revoked.contains_range(10, 40) is True
assert revoked.contains_range(5, 40) is False
print(f" is_revoked(25): {revoked.is_revoked(25)} (should be True)")
print(f" is_revoked(9): {revoked.is_revoked(9)} (should be False)")
print(f" contains_range(10,40): {revoked.contains_range(10,40)}")
print(f" len(revoked): {len(revoked)} (1 merged range)")
print(f" repr: {revoked!r}")
def demo_standalone_and_validate(
cp_der: bytes,
root: bytes,
leaf_hashes: list[bytes],
tree_size: int,
) -> None:
section("6a — StandaloneCertificateBuilder: assemble for alice.example.com")
alice_serial = (0 << 48) | 1
tbs_cert_der = _make_tbs_cert(alice_serial, "alice.example.com")
cosigner_der = _enc(lambda e: e.encode_oid(synta.ObjectIdentifier(_COSIGNER_OID)))
subtree_der = _seq(_int(0), _int(tree_size), _oct(root))
subsig_der = _seq(cosigner_der, subtree_der, cp_der, _ECDSA_ALG, _bit(_DUMMY_SIG))
sc_der = (
mtc.StandaloneCertificateBuilder()
.tbs_certificate(tbs_cert_der)
.log_entry_index(1) .tree_leaves(leaf_hashes) .hash_algorithm(mtc.HashAlgorithm.Sha256)
.add_subtree_signature(subsig_der) .signature_algorithm(_ECDSA_ALG) .signature(_DUMMY_SIG) .build()
)
print(f" StandaloneCertificate DER: {len(sc_der)} bytes")
sc = mtc.StandaloneCertificate.from_der(sc_der)
print(f" tbs_certificate_der: {len(sc.tbs_certificate_der)} bytes")
print(f" signature_algorithm_oid: {sc.signature_algorithm_oid}")
print(f" signature: {len(sc.signature)} bytes (placeholder)")
print(f" subtree_signatures: {len(sc.subtree_signatures)} record(s)")
print(f" inclusion_proof: {repr(sc.inclusion_proof)}")
assert sc.signature_algorithm_oid == _ECDSA_OID
assert len(sc.subtree_signatures) == 1
section("6b — CertificateValidator: validate alice's StandaloneCertificate")
anchor = mtc.TrustAnchor.from_log_id_der(_LOG_ID_DER, "Example CA Log")
print(f" TrustAnchor: {anchor!r}")
print(f" name : {anchor.name}")
print(f" active: {anchor.active}")
policy = mtc.ValidationPolicy.permissive()
print(f" ValidationPolicy.permissive(): {policy!r}")
print(f" min_cosignatures: {policy.min_cosignatures}")
print(f" check_expiration: {policy.check_expiration}")
validator = mtc.CertificateValidator(policy)
validator.add_trust_anchor(anchor)
print(f" trust_anchor_count: {validator.trust_anchor_count()}")
validator.validate_standalone(sc_der)
print(" validate_standalone: certificate is VALID")
section("6c — Revocation: reject a revoked leaf index via TrustAnchor")
anchor_revoked = mtc.TrustAnchor.from_log_id_der(_LOG_ID_DER, "Example CA Log (revoked)")
anchor_revoked.revoke_range(1, 1)
assert anchor_revoked.is_revoked(1) is True
print(f" anchor.is_revoked(1): {anchor_revoked.is_revoked(1)} (True — alice revoked)")
print(f" anchor.is_revoked(2): {anchor_revoked.is_revoked(2)} (False — bob not revoked)")
validator_rev = mtc.CertificateValidator(policy)
validator_rev.add_trust_anchor(anchor_revoked)
try:
validator_rev.validate_standalone(sc_der)
raise AssertionError("expected ValueError — certificate was revoked")
except ValueError as exc:
assert "revoked" in str(exc).lower(), f"unexpected error: {exc}"
print(f" validate_standalone (revoked anchor) raised ValueError: {exc}")
print(" Revocation check: OK")
def main() -> None:
print("=== MTC builder and validator example ===")
demo_hash_algorithm()
cp_der, root, leaf_hashes, tree_size = demo_issuance_log()
demo_mtc_proof(leaf_hashes)
demo_serial_helpers()
demo_revoked_ranges()
demo_standalone_and_validate(cp_der, root, leaf_hashes, tree_size)
print("\nAll steps completed successfully.")
if __name__ == "__main__":
main()