import synta
import synta.mtc as mtc
def section(title):
print(f"\n{'─' * 60}\n{title}\n{'─' * 60}")
def _enc(fn):
enc = synta.Encoder(synta.Encoding.DER)
fn(enc)
return enc.finish()
def _seq(*parts):
enc = synta.Encoder(synta.Encoding.DER)
enc.encode_sequence(b"".join(parts))
return enc.finish()
def _explicit(n, content):
enc = synta.Encoder(synta.Encoding.DER)
enc.encode_explicit_tag(n, "Context", content)
return enc.finish()
def _implicit(n, inner_der):
orig_tag = inner_der[0]
constructed = orig_tag & 0x20
length_byte = inner_der[1]
assert length_byte < 0x80, "only short-form length supported in helper"
content = inner_der[2:2 + length_byte]
ctx_tag = 0x80 | constructed | n
return bytes([ctx_tag, len(content)]) + content
def _int_der(v):
return _enc(lambda e: e.encode_integer(v))
def _oct_der(b):
return _enc(lambda e: e.encode_octet_string(b))
def _bit_der(b):
return _enc(lambda e: e.encode_bit_string(synta.BitString(b, 0)))
def _bool_der(v):
return _enc(lambda e: e.encode_boolean(v))
def _oid_der(s):
return _enc(lambda e: e.encode_oid(synta.ObjectIdentifier(s)))
def _gt_der(year, month, day, hour, minute, second):
return _enc(
lambda e: e.encode_generalized_time(
synta.GeneralizedTime(year, month, day, hour, minute, second, None)
)
)
NULL_DER = _enc(lambda e: e.encode_null())
SHA256_OID = "2.16.840.1.101.3.4.2.1"
SHA256_ALG_ID = _seq(_oid_der(SHA256_OID))
ECDSA_SHA256_OID = "1.2.840.10045.4.3.2"
ECDSA_ALG_ID = _seq(_oid_der(ECDSA_SHA256_OID))
RSA_OID = "1.2.840.113549.1.1.1"
RSA_ALG_ID = _seq(_oid_der(RSA_OID), NULL_DER)
SPKI_DER = _seq(RSA_ALG_ID, _bit_der(b"\x00\x01\x02\x03"))
LOG_ID_DER = _seq(SHA256_ALG_ID, SPKI_DER)
def _name_der(cn: str) -> bytes:
atav_content = _enc(lambda e: e.encode_oid(synta.ObjectIdentifier("2.5.4.3"))) \
+ _enc(lambda e: e.encode_utf8_string(cn))
atav = _seq(atav_content) enc = synta.Encoder(synta.Encoding.DER)
enc.encode_set(atav) rdn = enc.finish()
return _seq(rdn)
COSIGNER_DER = _seq(SHA256_ALG_ID, SPKI_DER)
SUBTREE_DER = _seq(_int_der(0), _int_der(1024), _oct_der(b"\xcc" * 32))
CHECKPOINT_DER = _seq(
LOG_ID_DER,
_int_der(1024), _oct_der(b"\xde" * 32), _gt_der(2026, 1, 1, 0, 0, 0), )
def demo_proof_node():
section("ProofNode — a sibling hash in the inclusion path")
node1_der = _oct_der(b"\xab" * 32)
node2_der = _oct_der(b"\xcd" * 32)
node1 = mtc.ProofNode.from_der(node1_der)
node2 = mtc.ProofNode.from_der(node2_der)
assert node1.hash == b"\xab" * 32
assert node2.hash == b"\xcd" * 32
print(f" node1 hash: {node1.hash.hex()[:16]}... ({len(node1.hash)} bytes)")
print(f" node2 hash: {node2.hash.hex()[:16]}... ({len(node2.hash)} bytes)")
print(f" node1 repr: {repr(node1)}")
print(f" equality: node1 == node1? {node1 == mtc.ProofNode.from_der(node1_der)}")
def demo_subtree():
section("Subtree — aggregated hash over a leaf range")
st = mtc.Subtree.from_der(SUBTREE_DER)
assert st.start == 0
assert st.end == 1024
assert st.value == b"\xcc" * 32
print(f" start: {st.start}")
print(f" end: {st.end}")
print(f" value: {st.value.hex()[:16]}... ({len(st.value)} bytes)")
print(f" repr: {repr(st)}")
st2_der = _seq(_int_der(1024), _int_der(2048), _oct_der(b"\xdd" * 32))
st2 = mtc.Subtree.from_der(st2_der)
assert st2.start == 1024
assert st2.end == 2048
print(f" second subtree: [{st2.start}, {st2.end})")
def demo_subtree_proof():
section("SubtreeProof — optional left and right subtree lists")
st_left = _seq(_int_der(0), _int_der(64), _oct_der(b"\xaa" * 32))
st_right = _seq(_int_der(64), _int_der(128), _oct_der(b"\xbb" * 32))
sp_both_der = _seq(_seq(st_left), _seq(st_right))
sp_both = mtc.SubtreeProof.from_der(sp_both_der)
assert sp_both.left_subtrees is not None
assert sp_both.right_subtrees is not None
assert len(sp_both.left_subtrees) == 1
assert len(sp_both.right_subtrees) == 1
assert sp_both.left_subtrees[0].start == 0
assert sp_both.right_subtrees[0].start == 64
print(f" both fields: left={len(sp_both.left_subtrees)} subtree(s), "
f"right={len(sp_both.right_subtrees)} subtree(s)")
print(f" left[0]: [{sp_both.left_subtrees[0].start}, {sp_both.left_subtrees[0].end})")
print(f" right[0]: [{sp_both.right_subtrees[0].start}, {sp_both.right_subtrees[0].end})")
print(f" repr: {repr(sp_both)}")
sp_empty = mtc.SubtreeProof.from_der(_seq())
assert sp_empty.left_subtrees is None
assert sp_empty.right_subtrees is None
print(f" empty: left={sp_empty.left_subtrees}, right={sp_empty.right_subtrees}")
def demo_inclusion_proof():
section("InclusionProof — log inclusion proof with sibling hashes")
node1 = _oct_der(b"\x11" * 32)
node2 = _oct_der(b"\x22" * 32)
node3 = _oct_der(b"\x33" * 32)
ip_der = _seq(
_int_der(0), _int_der(16), _seq(node1, node2, node3), )
ip = mtc.InclusionProof.from_der(ip_der)
assert ip.subtree_start == 0
assert ip.subtree_end == 16
path = ip.inclusion_path
assert len(path) == 3
print(f" subtree_start: {ip.subtree_start}")
print(f" subtree_end: {ip.subtree_end}")
print(f" inclusion_path: {len(path)} nodes")
for i, node in enumerate(path):
print(f" [{i}] hash: {node.hash.hex()[:16]}...")
print(f" repr: {repr(ip)}")
def demo_log_id():
section("LogID — log identifier (hash algorithm + public key)")
lid = mtc.LogID.from_der(LOG_ID_DER)
assert lid.hash_algorithm_oid == SHA256_OID
pk_der = lid.public_key_der
assert len(pk_der) > 0
assert pk_der[0] == 0x30
print(f" hash_algorithm_oid: {lid.hash_algorithm_oid} (sha-256)")
print(f" public_key_der: {pk_der.hex()} ({len(pk_der)} bytes, SPKI)")
print(f" repr: {repr(lid)}")
print(f" equality: lid == lid? {lid == mtc.LogID.from_der(LOG_ID_DER)}")
def demo_cosigner_id():
section("CosignerID — hash algorithm + public key (mirrors LogID)")
cos = mtc.CosignerID.from_der(COSIGNER_DER)
assert cos.hash_algorithm_oid == SHA256_OID
assert cos.public_key_der[0] == 0x30
print(f" hash_algorithm_oid: {cos.hash_algorithm_oid} (sha-256)")
print(f" public_key_der: {cos.public_key_der.hex()} ({len(cos.public_key_der)} bytes)")
print(f" repr: {repr(cos)}")
def demo_checkpoint():
section("Checkpoint — signed Merkle tree snapshot")
cp = mtc.Checkpoint.from_der(CHECKPOINT_DER)
assert cp.tree_size == 1024
assert cp.tree_minimum_index is None
assert cp.root_value == b"\xde" * 32
assert cp.timestamp == "20260101000000Z"
assert cp.log_id.hash_algorithm_oid == SHA256_OID
print(f" tree_size: {cp.tree_size}")
print(f" tree_minimum_index: {cp.tree_minimum_index}")
print(f" root_value: {cp.root_value.hex()[:16]}... ({len(cp.root_value)} bytes)")
print(f" timestamp: {cp.timestamp}")
print(f" log_id.hash_alg: {cp.log_id.hash_algorithm_oid}")
print(f" repr: {repr(cp)}")
cp2_der = _seq(
LOG_ID_DER,
_int_der(2048), _int_der(1024), _oct_der(b"\xef" * 32), _gt_der(2026, 6, 1, 12, 0, 0), )
cp2 = mtc.Checkpoint.from_der(cp2_der)
assert cp2.tree_size == 2048
assert cp2.tree_minimum_index == 1024
print(f"\n (with treeMinimumIndex) tree_size={cp2.tree_size}, "
f"tree_minimum_index={cp2.tree_minimum_index}")
def demo_subtree_signature():
section("SubtreeSignature — cosigner signature over subtree + checkpoint")
ss_der = _seq(
COSIGNER_DER,
SUBTREE_DER,
CHECKPOINT_DER,
ECDSA_ALG_ID,
_bit_der(b"\xca\xfe\xba\xbe" * 8), )
ss = mtc.SubtreeSignature.from_der(ss_der)
assert ss.signature_algorithm_oid == ECDSA_SHA256_OID
assert ss.cosigner.hash_algorithm_oid == SHA256_OID
assert ss.subtree.start == 0
assert ss.subtree.end == 1024
assert ss.checkpoint.tree_size == 1024
assert len(ss.signature) == 32
print(f" signature_algorithm_oid: {ss.signature_algorithm_oid}")
print(f" cosigner.hash_algorithm_oid: {ss.cosigner.hash_algorithm_oid}")
print(f" subtree: [{ss.subtree.start}, {ss.subtree.end})")
print(f" checkpoint.tree_size: {ss.checkpoint.tree_size}")
print(f" checkpoint.timestamp: {ss.checkpoint.timestamp}")
print(f" signature: {ss.signature.hex()[:16]}... ({len(ss.signature)} bytes)")
print(f" repr: {repr(ss)}")
def demo_merkle_tree_cert_entry():
section("MerkleTreeCertEntry — CHOICE: NullEntry or TbsCertEntry")
null_entry_der = b"\x80\x00"
ce_null = mtc.MerkleTreeCertEntry.from_der(null_entry_der)
assert ce_null.variant == "NullEntry"
assert ce_null.tbs_cert_entry is None
print(f" NullEntry variant: {ce_null.variant}")
print(f" tbs_cert_entry: {ce_null.tbs_cert_entry}")
print(f" repr: {repr(ce_null)}")
def demo_landmark_id():
section("LandmarkID — LogID + tree size at landmark issuance")
lm_id_der = _seq(LOG_ID_DER, _int_der(4096))
lm_id = mtc.LandmarkID.from_der(lm_id_der)
assert lm_id.tree_size == 4096
assert lm_id.log_id.hash_algorithm_oid == SHA256_OID
print(f" tree_size: {lm_id.tree_size}")
print(f" log_id.hash_alg_oid: {lm_id.log_id.hash_algorithm_oid}")
print(f" log_id.public_key_der: {len(lm_id.log_id.public_key_der)} bytes (SPKI)")
print(f" repr: {repr(lm_id)}")
def main():
print("=" * 60)
print("Example 21: Merkle Tree Certificate (MTC) ASN.1 types")
print("=" * 60)
demo_proof_node()
demo_subtree()
demo_subtree_proof()
demo_inclusion_proof()
demo_log_id()
demo_cosigner_id()
demo_checkpoint()
demo_subtree_signature()
demo_merkle_tree_cert_entry()
demo_landmark_id()
print("\nAll MTC examples completed.")
if __name__ == "__main__":
main()