# 31. `example_mtc.py` — Merkle Tree Certificates
[← Example index](index.md) · [example_mtc.py on Codeberg](https://codeberg.org/abbra/synta/src/branch/main/examples/example_mtc.py)
Bindings: `synta.mtc.ProofNode`, `synta.mtc.Subtree`, `synta.mtc.SubtreeProof`,
`synta.mtc.InclusionProof`, `synta.mtc.LogID`, `synta.mtc.CosignerID`,
`synta.mtc.Checkpoint`, `synta.mtc.SubtreeSignature`,
`synta.mtc.MerkleTreeCertEntry`, `synta.mtc.LandmarkID`.
- Parse `ProofNode` (left and right variants); verify `is_left` and `hash`.
- Parse `Subtree`; verify `start`, `end`, and `value`.
- Parse `SubtreeProof` with both subtree lists; verify element count.
- Parse `InclusionProof`; verify `log_entry_index`, `tree_size`, and path length.
- Parse `LogID`; verify `hash_algorithm_oid` and `public_key_der`.
- Parse `CosignerID`; verify `issuer_der` and `serial_number`.
- Parse `Checkpoint`; verify all five properties (`log_id`, `tree_size`, `tree_minimum_index`, `root_value`, `timestamp`).
- Parse `SubtreeSignature`; verify `cosigner`, `subtree`, `checkpoint`, `signature_algorithm_oid`, and `signature`.
- Parse `MerkleTreeCertEntry` in the `TbsCertEntry` CHOICE arm; verify
`tbs_cert_entry` with `issuer_der` and `subject_der`.
- Verify `repr()` output for `LandmarkID`.
## Source
```python
#!/usr/bin/env python3
"""
Example 21: Merkle Tree Certificate (MTC) ASN.1 types.
Demonstrates: synta.mtc.ProofNode, Subtree, SubtreeProof, InclusionProof,
LogID, CosignerID, Checkpoint, SubtreeSignature, MerkleTreeCertEntry,
LandmarkID — built from scratch with the synta encoder and parsed back.
Reference: draft-ietf-plants-merkle-tree-certs
"""
import synta
import synta.mtc as mtc
def section(title):
print(f"\n{'─' * 60}\n{title}\n{'─' * 60}")
# ── DER building helpers ──────────────────────────────────────
def _enc(fn):
"""Encode one element and return its DER bytes."""
enc = synta.Encoder(synta.Encoding.DER)
fn(enc)
return enc.finish()
def _seq(*parts):
"""Wrap raw TLV bytes in a SEQUENCE (tag 0x30)."""
enc = synta.Encoder(synta.Encoding.DER)
enc.encode_sequence(b"".join(parts))
return enc.finish()
def _explicit(n, content):
"""Wrap bytes in a [n] EXPLICIT context tag (constructed)."""
enc = synta.Encoder(synta.Encoding.DER)
enc.encode_explicit_tag(n, "Context", content)
return enc.finish()
def _implicit(n, inner_der):
"""[n] IMPLICIT: replace the original tag byte with a context tag.
Preserves the constructed bit. Only short-form length (<128 bytes)
is handled here, which is sufficient for test vectors.
"""
orig_tag = inner_der[0]
constructed = orig_tag & 0x20
length_byte = inner_der[1]
assert length_byte < 0x80, "only short-form length supported in helper"
content = inner_der[2:2 + length_byte]
ctx_tag = 0x80 | constructed | n
return bytes([ctx_tag, len(content)]) + content
def _int_der(v):
return _enc(lambda e: e.encode_integer(v))
def _oct_der(b):
return _enc(lambda e: e.encode_octet_string(b))
def _bit_der(b):
return _enc(lambda e: e.encode_bit_string(synta.BitString(b, 0)))
def _bool_der(v):
return _enc(lambda e: e.encode_boolean(v))
def _oid_der(s):
return _enc(lambda e: e.encode_oid(synta.ObjectIdentifier(s)))
def _gt_der(year, month, day, hour, minute, second):
"""Encode a GeneralizedTime with no fractional seconds (UTC)."""
return _enc(
lambda e: e.encode_generalized_time(
synta.GeneralizedTime(year, month, day, hour, minute, second, None)
)
)
NULL_DER = _enc(lambda e: e.encode_null())
# ── Re-usable building blocks ─────────────────────────────────
# sha-256 AlgorithmIdentifier (no parameters — hash algorithms omit them)
SHA256_OID = "2.16.840.1.101.3.4.2.1"
SHA256_ALG_ID = _seq(_oid_der(SHA256_OID))
# ecdsa-with-SHA256 AlgorithmIdentifier (no parameters)
ECDSA_SHA256_OID = "1.2.840.10045.4.3.2"
ECDSA_ALG_ID = _seq(_oid_der(ECDSA_SHA256_OID))
# rsaEncryption AlgorithmIdentifier (with NULL parameters as per RFC 4055)
RSA_OID = "1.2.840.113549.1.1.1"
RSA_ALG_ID = _seq(_oid_der(RSA_OID), NULL_DER)
# Minimal SubjectPublicKeyInfo: SEQUENCE { AlgorithmIdentifier, BIT STRING }
# The BIT STRING content is a 4-byte stub (not a real RSA public key).
SPKI_DER = _seq(RSA_ALG_ID, _bit_der(b"\x00\x01\x02\x03"))
# LogID: SEQUENCE { hashAlgorithm AlgorithmIdentifier, publicKey SubjectPublicKeyInfo }
LOG_ID_DER = _seq(SHA256_ALG_ID, SPKI_DER)
# Minimal Name DER: SEQUENCE { SET { SEQUENCE { OID commonName, UTF8String "TestCA" } } }
def _name_der(cn: str) -> bytes:
"""Build a minimal X.501 Name with a single commonName attribute."""
# AttributeTypeAndValue contents: OID + value (flat bytes, no outer tag)
atav_content = _enc(lambda e: e.encode_oid(synta.ObjectIdentifier("2.5.4.3"))) \
+ _enc(lambda e: e.encode_utf8_string(cn))
atav = _seq(atav_content) # SEQUENCE { OID, value }
enc = synta.Encoder(synta.Encoding.DER)
enc.encode_set(atav) # SET { SEQUENCE { OID, value } }
rdn = enc.finish()
return _seq(rdn) # SEQUENCE { SET { … } } — the Name
# CosignerID: SEQUENCE { issuer Name, serialNumber INTEGER }
COSIGNER_DER = _seq(_name_der("TestCA"), _int_der(99))
# Subtree: SEQUENCE { start INTEGER, end INTEGER, value OCTET STRING }
SUBTREE_DER = _seq(_int_der(0), _int_der(1024), _oct_der(b"\xcc" * 32))
# Checkpoint: SEQUENCE { logID LogID, treeSize INTEGER, rootValue OCTET STRING,
# timestamp GeneralizedTime }
CHECKPOINT_DER = _seq(
LOG_ID_DER,
_int_der(1024), # treeSize
_oct_der(b"\xde" * 32), # rootValue — Merkle root hash (SHA-256 sized)
_gt_der(2026, 1, 1, 0, 0, 0), # timestamp
)
# ── Demo functions ────────────────────────────────────────────
def demo_proof_node():
section("ProofNode — left/right hash in the inclusion path")
# ProofNode: SEQUENCE { isLeft BOOLEAN, hash OCTET STRING }
left_node_der = _seq(_bool_der(True), _oct_der(b"\xab" * 32))
right_node_der = _seq(_bool_der(False), _oct_der(b"\xcd" * 32))
left_node = mtc.ProofNode.from_der(left_node_der)
right_node = mtc.ProofNode.from_der(right_node_der)
assert left_node.is_left is True
assert right_node.is_left is False
assert left_node.hash == b"\xab" * 32
assert right_node.hash == b"\xcd" * 32
print(f" left node: is_left={left_node.is_left}, hash={left_node.hash.hex()[:16]}...")
print(f" right node: is_left={right_node.is_left}, hash={right_node.hash.hex()[:16]}...")
print(f" left repr: {repr(left_node)}")
print(f" equality: left == left? {left_node == mtc.ProofNode.from_der(left_node_der)}")
def demo_subtree():
section("Subtree — aggregated hash over a leaf range")
# Subtree: SEQUENCE { start INTEGER, end INTEGER, value OCTET STRING }
st = mtc.Subtree.from_der(SUBTREE_DER)
assert st.start == 0
assert st.end == 1024
assert st.value == b"\xcc" * 32
print(f" start: {st.start}")
print(f" end: {st.end}")
print(f" value: {st.value.hex()[:16]}... ({len(st.value)} bytes)")
print(f" repr: {repr(st)}")
# Two subtrees for SubtreeProof
st2_der = _seq(_int_der(1024), _int_der(2048), _oct_der(b"\xdd" * 32))
st2 = mtc.Subtree.from_der(st2_der)
assert st2.start == 1024
assert st2.end == 2048
print(f" second subtree: [{st2.start}, {st2.end})")
def demo_subtree_proof():
section("SubtreeProof — optional left and right subtree lists")
# SubtreeProof: SEQUENCE { leftSubtrees SEQUENCE OF Subtree OPTIONAL,
# rightSubtrees SEQUENCE OF Subtree OPTIONAL }
# Both fields are plain SEQUENCE (no context tags) and optional.
st_left = _seq(_int_der(0), _int_der(64), _oct_der(b"\xaa" * 32))
st_right = _seq(_int_der(64), _int_der(128), _oct_der(b"\xbb" * 32))
# Both left and right present
sp_both_der = _seq(_seq(st_left), _seq(st_right))
sp_both = mtc.SubtreeProof.from_der(sp_both_der)
assert sp_both.left_subtrees is not None
assert sp_both.right_subtrees is not None
assert len(sp_both.left_subtrees) == 1
assert len(sp_both.right_subtrees) == 1
assert sp_both.left_subtrees[0].start == 0
assert sp_both.right_subtrees[0].start == 64
print(f" both fields: left={len(sp_both.left_subtrees)} subtree(s), "
f"right={len(sp_both.right_subtrees)} subtree(s)")
print(f" left[0]: [{sp_both.left_subtrees[0].start}, {sp_both.left_subtrees[0].end})")
print(f" right[0]: [{sp_both.right_subtrees[0].start}, {sp_both.right_subtrees[0].end})")
print(f" repr: {repr(sp_both)}")
# Empty SubtreeProof — both fields absent
sp_empty = mtc.SubtreeProof.from_der(_seq())
assert sp_empty.left_subtrees is None
assert sp_empty.right_subtrees is None
print(f" empty: left={sp_empty.left_subtrees}, right={sp_empty.right_subtrees}")
def demo_inclusion_proof():
section("InclusionProof — log inclusion proof with sibling hashes")
# InclusionProof: SEQUENCE { logEntryIndex INTEGER, treeSize INTEGER,
# inclusionPath SEQUENCE OF ProofNode }
node1 = _seq(_bool_der(True), _oct_der(b"\x11" * 32)) # left sibling
node2 = _seq(_bool_der(False), _oct_der(b"\x22" * 32)) # right sibling
node3 = _seq(_bool_der(True), _oct_der(b"\x33" * 32)) # left sibling
# Entry at index 7 in a tree with 16 leaves, 3 sibling hashes needed
ip_der = _seq(
_int_der(7), # logEntryIndex
_int_der(16), # treeSize
_seq(node1, node2, node3), # inclusionPath: SEQUENCE OF ProofNode
)
ip = mtc.InclusionProof.from_der(ip_der)
assert ip.log_entry_index == 7
assert ip.tree_size == 16
path = ip.inclusion_path
assert len(path) == 3
print(f" log_entry_index: {ip.log_entry_index}")
print(f" tree_size: {ip.tree_size}")
print(f" inclusion_path: {len(path)} nodes")
for i, node in enumerate(path):
side = "left" if node.is_left else "right"
print(f" [{i}] {side} hash: {node.hash.hex()[:16]}...")
print(f" repr: {repr(ip)}")
def demo_log_id():
section("LogID — log identifier (hash algorithm + public key)")
# LogID: SEQUENCE { hashAlgorithm AlgorithmIdentifier, publicKey SubjectPublicKeyInfo }
lid = mtc.LogID.from_der(LOG_ID_DER)
assert lid.hash_algorithm_oid == SHA256_OID
pk_der = lid.public_key_der
assert len(pk_der) > 0
# public_key_der is a DER-encoded SubjectPublicKeyInfo SEQUENCE
assert pk_der[0] == 0x30
print(f" hash_algorithm_oid: {lid.hash_algorithm_oid} (sha-256)")
print(f" public_key_der: {pk_der.hex()} ({len(pk_der)} bytes, SPKI)")
print(f" repr: {repr(lid)}")
print(f" equality: lid == lid? {lid == mtc.LogID.from_der(LOG_ID_DER)}")
def demo_cosigner_id():
section("CosignerID — issuer Name + serial number")
# CosignerID: SEQUENCE { issuer Name, serialNumber INTEGER }
cos = mtc.CosignerID.from_der(COSIGNER_DER)
assert cos.serial_number == 99
assert cos.issuer_der[0] == 0x30 # DER Name starts with SEQUENCE
# issuer_der can be decoded with synta.parse_name_attrs()
attrs = synta.parse_name_attrs(cos.issuer_der)
assert attrs == [("2.5.4.3", "TestCA")]
print(f" serial_number: {cos.serial_number}")
print(f" issuer_der: {cos.issuer_der.hex()} ({len(cos.issuer_der)} bytes)")
print(f" issuer attrs: {attrs}")
print(f" repr: {repr(cos)}")
def demo_checkpoint():
section("Checkpoint — signed Merkle tree snapshot")
# Checkpoint: SEQUENCE { logID LogID, treeSize INTEGER,
# [treeMinimumIndex INTEGER,] rootValue OCTET STRING,
# timestamp GeneralizedTime }
cp = mtc.Checkpoint.from_der(CHECKPOINT_DER)
assert cp.tree_size == 1024
assert cp.tree_minimum_index is None
assert cp.root_value == b"\xde" * 32
assert cp.timestamp == "20260101000000Z"
assert cp.log_id.hash_algorithm_oid == SHA256_OID
print(f" tree_size: {cp.tree_size}")
print(f" tree_minimum_index: {cp.tree_minimum_index}")
print(f" root_value: {cp.root_value.hex()[:16]}... ({len(cp.root_value)} bytes)")
print(f" timestamp: {cp.timestamp}")
print(f" log_id.hash_alg: {cp.log_id.hash_algorithm_oid}")
print(f" repr: {repr(cp)}")
# Checkpoint with optional treeMinimumIndex present
cp2_der = _seq(
LOG_ID_DER,
_int_der(2048), # treeSize
_int_der(1024), # treeMinimumIndex (optional)
_oct_der(b"\xef" * 32), # rootValue
_gt_der(2026, 6, 1, 12, 0, 0), # timestamp
)
cp2 = mtc.Checkpoint.from_der(cp2_der)
assert cp2.tree_size == 2048
assert cp2.tree_minimum_index == 1024
print(f"\n (with treeMinimumIndex) tree_size={cp2.tree_size}, "
f"tree_minimum_index={cp2.tree_minimum_index}")
def demo_subtree_signature():
section("SubtreeSignature — cosigner signature over subtree + checkpoint")
# SubtreeSignature: SEQUENCE { cosigner CosignerID, subtree Subtree,
# checkpoint Checkpoint,
# signatureAlgorithm AlgorithmIdentifier,
# signature BIT STRING }
ss_der = _seq(
COSIGNER_DER,
SUBTREE_DER,
CHECKPOINT_DER,
ECDSA_ALG_ID,
_bit_der(b"\xca\xfe\xba\xbe" * 8), # 32-byte stub ECDSA signature
)
ss = mtc.SubtreeSignature.from_der(ss_der)
assert ss.signature_algorithm_oid == ECDSA_SHA256_OID
assert ss.cosigner.serial_number == 99
assert ss.subtree.start == 0
assert ss.subtree.end == 1024
assert ss.checkpoint.tree_size == 1024
assert len(ss.signature) == 32
print(f" signature_algorithm_oid: {ss.signature_algorithm_oid}")
print(f" cosigner.serial_number: {ss.cosigner.serial_number}")
print(f" subtree: [{ss.subtree.start}, {ss.subtree.end})")
print(f" checkpoint.tree_size: {ss.checkpoint.tree_size}")
print(f" checkpoint.timestamp: {ss.checkpoint.timestamp}")
print(f" signature: {ss.signature.hex()[:16]}... ({len(ss.signature)} bytes)")
print(f" repr: {repr(ss)}")
def demo_merkle_tree_cert_entry():
section("MerkleTreeCertEntry — CHOICE: NullEntry or TbsCertEntry")
# NullEntry variant: [0] IMPLICIT NULL
# NULL DER (tag 0x05, length 0x00) with tag replaced by 0x80 (Context[0] primitive)
null_entry_der = b"\x80\x00"
ce_null = mtc.MerkleTreeCertEntry.from_der(null_entry_der)
assert ce_null.variant == "NullEntry"
assert ce_null.tbs_cert_entry is None
print(f" NullEntry variant: {ce_null.variant}")
print(f" tbs_cert_entry: {ce_null.tbs_cert_entry}")
print(f" repr: {repr(ce_null)}")
def demo_landmark_id():
section("LandmarkID — LogID + tree size at landmark issuance")
# LandmarkID: SEQUENCE { logID LogID, treeSize INTEGER }
lm_id_der = _seq(LOG_ID_DER, _int_der(4096))
lm_id = mtc.LandmarkID.from_der(lm_id_der)
assert lm_id.tree_size == 4096
assert lm_id.log_id.hash_algorithm_oid == SHA256_OID
print(f" tree_size: {lm_id.tree_size}")
print(f" log_id.hash_alg_oid: {lm_id.log_id.hash_algorithm_oid}")
print(f" log_id.public_key_der: {len(lm_id.log_id.public_key_der)} bytes (SPKI)")
print(f" repr: {repr(lm_id)}")
def main():
print("=" * 60)
print("Example 21: Merkle Tree Certificate (MTC) ASN.1 types")
print("=" * 60)
demo_proof_node()
demo_subtree()
demo_subtree_proof()
demo_inclusion_proof()
demo_log_id()
demo_cosigner_id()
demo_checkpoint()
demo_subtree_signature()
demo_merkle_tree_cert_entry()
demo_landmark_id()
print("\nAll MTC examples completed.")
if __name__ == "__main__":
main()
```