synta 0.1.6

ASN.1 parser, decoder, and encoder library with DER/BER support and C FFI
Documentation
#!/usr/bin/env python3
"""
Example 21: Merkle Tree Certificate (MTC) ASN.1 types.

Demonstrates: synta.mtc.ProofNode, Subtree, SubtreeProof, InclusionProof,
LogID, CosignerID, Checkpoint, SubtreeSignature, MerkleTreeCertEntry,
LandmarkID — built from scratch with the synta encoder and parsed back.

Reference: draft-ietf-plants-merkle-tree-certs
"""

import synta
import synta.mtc as mtc


def section(title):
    print(f"\n{'' * 60}\n{title}\n{'' * 60}")


# ── DER building helpers ──────────────────────────────────────

def _enc(fn):
    """Encode one element and return its DER bytes."""
    enc = synta.Encoder(synta.Encoding.DER)
    fn(enc)
    return enc.finish()


def _seq(*parts):
    """Wrap raw TLV bytes in a SEQUENCE (tag 0x30)."""
    enc = synta.Encoder(synta.Encoding.DER)
    enc.encode_sequence(b"".join(parts))
    return enc.finish()


def _explicit(n, content):
    """Wrap bytes in a [n] EXPLICIT context tag (constructed)."""
    enc = synta.Encoder(synta.Encoding.DER)
    enc.encode_explicit_tag(n, "Context", content)
    return enc.finish()


def _implicit(n, inner_der):
    """[n] IMPLICIT: replace the original tag byte with a context tag.

    Preserves the constructed bit.  Only short-form length (<128 bytes)
    is handled here, which is sufficient for test vectors.
    """
    orig_tag = inner_der[0]
    constructed = orig_tag & 0x20
    length_byte = inner_der[1]
    assert length_byte < 0x80, "only short-form length supported in helper"
    content = inner_der[2:2 + length_byte]
    ctx_tag = 0x80 | constructed | n
    return bytes([ctx_tag, len(content)]) + content


def _int_der(v):
    return _enc(lambda e: e.encode_integer(v))


def _oct_der(b):
    return _enc(lambda e: e.encode_octet_string(b))


def _bit_der(b):
    return _enc(lambda e: e.encode_bit_string(synta.BitString(b, 0)))


def _bool_der(v):
    return _enc(lambda e: e.encode_boolean(v))


def _oid_der(s):
    return _enc(lambda e: e.encode_oid(synta.ObjectIdentifier(s)))


def _gt_der(year, month, day, hour, minute, second):
    """Encode a GeneralizedTime with no fractional seconds (UTC)."""
    return _enc(
        lambda e: e.encode_generalized_time(
            synta.GeneralizedTime(year, month, day, hour, minute, second, None)
        )
    )


NULL_DER = _enc(lambda e: e.encode_null())


# ── Re-usable building blocks ─────────────────────────────────

# sha-256 AlgorithmIdentifier (no parameters — hash algorithms omit them)
SHA256_OID = "2.16.840.1.101.3.4.2.1"
SHA256_ALG_ID = _seq(_oid_der(SHA256_OID))

# ecdsa-with-SHA256 AlgorithmIdentifier (no parameters)
ECDSA_SHA256_OID = "1.2.840.10045.4.3.2"
ECDSA_ALG_ID = _seq(_oid_der(ECDSA_SHA256_OID))

# rsaEncryption AlgorithmIdentifier (with NULL parameters as per RFC 4055)
RSA_OID = "1.2.840.113549.1.1.1"
RSA_ALG_ID = _seq(_oid_der(RSA_OID), NULL_DER)

# Minimal SubjectPublicKeyInfo: SEQUENCE { AlgorithmIdentifier, BIT STRING }
# The BIT STRING content is a 4-byte stub (not a real RSA public key).
SPKI_DER = _seq(RSA_ALG_ID, _bit_der(b"\x00\x01\x02\x03"))

# LogID: SEQUENCE { hashAlgorithm AlgorithmIdentifier, publicKey SubjectPublicKeyInfo }
LOG_ID_DER = _seq(SHA256_ALG_ID, SPKI_DER)

# Minimal Name DER: SEQUENCE { SET { SEQUENCE { OID commonName, UTF8String "TestCA" } } }
def _name_der(cn: str) -> bytes:
    """Build a minimal X.501 Name with a single commonName attribute."""
    # AttributeTypeAndValue contents: OID + value (flat bytes, no outer tag)
    atav_content = _enc(lambda e: e.encode_oid(synta.ObjectIdentifier("2.5.4.3"))) \
                 + _enc(lambda e: e.encode_utf8_string(cn))
    atav = _seq(atav_content)    # SEQUENCE { OID, value }
    enc = synta.Encoder(synta.Encoding.DER)
    enc.encode_set(atav)         # SET { SEQUENCE { OID, value } }
    rdn = enc.finish()
    return _seq(rdn)             # SEQUENCE { SET { … } }  — the Name

# CosignerID: SEQUENCE { issuer Name, serialNumber INTEGER }
COSIGNER_DER = _seq(_name_der("TestCA"), _int_der(99))

# Subtree: SEQUENCE { start INTEGER, end INTEGER, value OCTET STRING }
SUBTREE_DER = _seq(_int_der(0), _int_der(1024), _oct_der(b"\xcc" * 32))

# Checkpoint: SEQUENCE { logID LogID, treeSize INTEGER, rootValue OCTET STRING,
#                         timestamp GeneralizedTime }
CHECKPOINT_DER = _seq(
    LOG_ID_DER,
    _int_der(1024),                 # treeSize
    _oct_der(b"\xde" * 32),         # rootValue — Merkle root hash (SHA-256 sized)
    _gt_der(2026, 1, 1, 0, 0, 0),   # timestamp
)


# ── Demo functions ────────────────────────────────────────────

def demo_proof_node():
    section("ProofNode — left/right hash in the inclusion path")
    # ProofNode: SEQUENCE { isLeft BOOLEAN, hash OCTET STRING }
    left_node_der  = _seq(_bool_der(True),  _oct_der(b"\xab" * 32))
    right_node_der = _seq(_bool_der(False), _oct_der(b"\xcd" * 32))

    left_node  = mtc.ProofNode.from_der(left_node_der)
    right_node = mtc.ProofNode.from_der(right_node_der)

    assert left_node.is_left  is True
    assert right_node.is_left is False
    assert left_node.hash  == b"\xab" * 32
    assert right_node.hash == b"\xcd" * 32

    print(f"  left node:  is_left={left_node.is_left},  hash={left_node.hash.hex()[:16]}...")
    print(f"  right node: is_left={right_node.is_left}, hash={right_node.hash.hex()[:16]}...")
    print(f"  left repr:  {repr(left_node)}")
    print(f"  equality:   left == left? {left_node == mtc.ProofNode.from_der(left_node_der)}")


def demo_subtree():
    section("Subtree — aggregated hash over a leaf range")
    # Subtree: SEQUENCE { start INTEGER, end INTEGER, value OCTET STRING }
    st = mtc.Subtree.from_der(SUBTREE_DER)

    assert st.start == 0
    assert st.end   == 1024
    assert st.value == b"\xcc" * 32

    print(f"  start: {st.start}")
    print(f"  end:   {st.end}")
    print(f"  value: {st.value.hex()[:16]}...  ({len(st.value)} bytes)")
    print(f"  repr:  {repr(st)}")

    # Two subtrees for SubtreeProof
    st2_der = _seq(_int_der(1024), _int_der(2048), _oct_der(b"\xdd" * 32))
    st2 = mtc.Subtree.from_der(st2_der)
    assert st2.start == 1024
    assert st2.end   == 2048
    print(f"  second subtree: [{st2.start}, {st2.end})")


def demo_subtree_proof():
    section("SubtreeProof — optional left and right subtree lists")
    # SubtreeProof: SEQUENCE { leftSubtrees SEQUENCE OF Subtree OPTIONAL,
    #                          rightSubtrees SEQUENCE OF Subtree OPTIONAL }
    # Both fields are plain SEQUENCE (no context tags) and optional.

    st_left  = _seq(_int_der(0),    _int_der(64),   _oct_der(b"\xaa" * 32))
    st_right = _seq(_int_der(64),   _int_der(128),  _oct_der(b"\xbb" * 32))

    # Both left and right present
    sp_both_der = _seq(_seq(st_left), _seq(st_right))
    sp_both = mtc.SubtreeProof.from_der(sp_both_der)
    assert sp_both.left_subtrees  is not None
    assert sp_both.right_subtrees is not None
    assert len(sp_both.left_subtrees)  == 1
    assert len(sp_both.right_subtrees) == 1
    assert sp_both.left_subtrees[0].start  == 0
    assert sp_both.right_subtrees[0].start == 64
    print(f"  both fields: left={len(sp_both.left_subtrees)} subtree(s), "
          f"right={len(sp_both.right_subtrees)} subtree(s)")
    print(f"  left[0]:  [{sp_both.left_subtrees[0].start},  {sp_both.left_subtrees[0].end})")
    print(f"  right[0]: [{sp_both.right_subtrees[0].start}, {sp_both.right_subtrees[0].end})")
    print(f"  repr: {repr(sp_both)}")

    # Empty SubtreeProof — both fields absent
    sp_empty = mtc.SubtreeProof.from_der(_seq())
    assert sp_empty.left_subtrees  is None
    assert sp_empty.right_subtrees is None
    print(f"  empty: left={sp_empty.left_subtrees}, right={sp_empty.right_subtrees}")


def demo_inclusion_proof():
    section("InclusionProof — log inclusion proof with sibling hashes")
    # InclusionProof: SEQUENCE { logEntryIndex INTEGER, treeSize INTEGER,
    #                            inclusionPath SEQUENCE OF ProofNode }
    node1 = _seq(_bool_der(True),  _oct_der(b"\x11" * 32))  # left sibling
    node2 = _seq(_bool_der(False), _oct_der(b"\x22" * 32))  # right sibling
    node3 = _seq(_bool_der(True),  _oct_der(b"\x33" * 32))  # left sibling

    # Entry at index 7 in a tree with 16 leaves, 3 sibling hashes needed
    ip_der = _seq(
        _int_der(7),              # logEntryIndex
        _int_der(16),             # treeSize
        _seq(node1, node2, node3),  # inclusionPath: SEQUENCE OF ProofNode
    )
    ip = mtc.InclusionProof.from_der(ip_der)

    assert ip.log_entry_index == 7
    assert ip.tree_size       == 16
    path = ip.inclusion_path
    assert len(path) == 3

    print(f"  log_entry_index: {ip.log_entry_index}")
    print(f"  tree_size:       {ip.tree_size}")
    print(f"  inclusion_path:  {len(path)} nodes")
    for i, node in enumerate(path):
        side = "left" if node.is_left else "right"
        print(f"    [{i}] {side} hash: {node.hash.hex()[:16]}...")
    print(f"  repr: {repr(ip)}")


def demo_log_id():
    section("LogID — log identifier (hash algorithm + public key)")
    # LogID: SEQUENCE { hashAlgorithm AlgorithmIdentifier, publicKey SubjectPublicKeyInfo }
    lid = mtc.LogID.from_der(LOG_ID_DER)

    assert lid.hash_algorithm_oid == SHA256_OID
    pk_der = lid.public_key_der
    assert len(pk_der) > 0
    # public_key_der is a DER-encoded SubjectPublicKeyInfo SEQUENCE
    assert pk_der[0] == 0x30

    print(f"  hash_algorithm_oid: {lid.hash_algorithm_oid}  (sha-256)")
    print(f"  public_key_der:     {pk_der.hex()}  ({len(pk_der)} bytes, SPKI)")
    print(f"  repr:               {repr(lid)}")
    print(f"  equality:           lid == lid? {lid == mtc.LogID.from_der(LOG_ID_DER)}")


def demo_cosigner_id():
    section("CosignerID — issuer Name + serial number")
    # CosignerID: SEQUENCE { issuer Name, serialNumber INTEGER }
    cos = mtc.CosignerID.from_der(COSIGNER_DER)

    assert cos.serial_number == 99
    assert cos.issuer_der[0] == 0x30   # DER Name starts with SEQUENCE

    # issuer_der can be decoded with synta.parse_name_attrs()
    attrs = synta.parse_name_attrs(cos.issuer_der)
    assert attrs == [("2.5.4.3", "TestCA")]

    print(f"  serial_number: {cos.serial_number}")
    print(f"  issuer_der:    {cos.issuer_der.hex()}  ({len(cos.issuer_der)} bytes)")
    print(f"  issuer attrs:  {attrs}")
    print(f"  repr:          {repr(cos)}")


def demo_checkpoint():
    section("Checkpoint — signed Merkle tree snapshot")
    # Checkpoint: SEQUENCE { logID LogID, treeSize INTEGER,
    #                        [treeMinimumIndex INTEGER,]  rootValue OCTET STRING,
    #                        timestamp GeneralizedTime }

    cp = mtc.Checkpoint.from_der(CHECKPOINT_DER)

    assert cp.tree_size == 1024
    assert cp.tree_minimum_index is None
    assert cp.root_value == b"\xde" * 32
    assert cp.timestamp == "20260101000000Z"
    assert cp.log_id.hash_algorithm_oid == SHA256_OID

    print(f"  tree_size:          {cp.tree_size}")
    print(f"  tree_minimum_index: {cp.tree_minimum_index}")
    print(f"  root_value:         {cp.root_value.hex()[:16]}...  ({len(cp.root_value)} bytes)")
    print(f"  timestamp:          {cp.timestamp}")
    print(f"  log_id.hash_alg:    {cp.log_id.hash_algorithm_oid}")
    print(f"  repr:               {repr(cp)}")

    # Checkpoint with optional treeMinimumIndex present
    cp2_der = _seq(
        LOG_ID_DER,
        _int_der(2048),                  # treeSize
        _int_der(1024),                  # treeMinimumIndex (optional)
        _oct_der(b"\xef" * 32),          # rootValue
        _gt_der(2026, 6, 1, 12, 0, 0),  # timestamp
    )
    cp2 = mtc.Checkpoint.from_der(cp2_der)
    assert cp2.tree_size == 2048
    assert cp2.tree_minimum_index == 1024
    print(f"\n  (with treeMinimumIndex) tree_size={cp2.tree_size}, "
          f"tree_minimum_index={cp2.tree_minimum_index}")


def demo_subtree_signature():
    section("SubtreeSignature — cosigner signature over subtree + checkpoint")
    # SubtreeSignature: SEQUENCE { cosigner CosignerID, subtree Subtree,
    #                              checkpoint Checkpoint,
    #                              signatureAlgorithm AlgorithmIdentifier,
    #                              signature BIT STRING }
    ss_der = _seq(
        COSIGNER_DER,
        SUBTREE_DER,
        CHECKPOINT_DER,
        ECDSA_ALG_ID,
        _bit_der(b"\xca\xfe\xba\xbe" * 8),   # 32-byte stub ECDSA signature
    )
    ss = mtc.SubtreeSignature.from_der(ss_der)

    assert ss.signature_algorithm_oid == ECDSA_SHA256_OID
    assert ss.cosigner.serial_number  == 99
    assert ss.subtree.start  == 0
    assert ss.subtree.end    == 1024
    assert ss.checkpoint.tree_size == 1024
    assert len(ss.signature) == 32

    print(f"  signature_algorithm_oid: {ss.signature_algorithm_oid}")
    print(f"  cosigner.serial_number:  {ss.cosigner.serial_number}")
    print(f"  subtree:                 [{ss.subtree.start}, {ss.subtree.end})")
    print(f"  checkpoint.tree_size:    {ss.checkpoint.tree_size}")
    print(f"  checkpoint.timestamp:    {ss.checkpoint.timestamp}")
    print(f"  signature:               {ss.signature.hex()[:16]}...  ({len(ss.signature)} bytes)")
    print(f"  repr:                    {repr(ss)}")


def demo_merkle_tree_cert_entry():
    section("MerkleTreeCertEntry — CHOICE: NullEntry or TbsCertEntry")

    # NullEntry variant: [0] IMPLICIT NULL
    # NULL DER (tag 0x05, length 0x00) with tag replaced by 0x80 (Context[0] primitive)
    null_entry_der = b"\x80\x00"
    ce_null = mtc.MerkleTreeCertEntry.from_der(null_entry_der)

    assert ce_null.variant == "NullEntry"
    assert ce_null.tbs_cert_entry is None
    print(f"  NullEntry variant:   {ce_null.variant}")
    print(f"  tbs_cert_entry:      {ce_null.tbs_cert_entry}")
    print(f"  repr:                {repr(ce_null)}")


def demo_landmark_id():
    section("LandmarkID — LogID + tree size at landmark issuance")
    # LandmarkID: SEQUENCE { logID LogID, treeSize INTEGER }
    lm_id_der = _seq(LOG_ID_DER, _int_der(4096))
    lm_id = mtc.LandmarkID.from_der(lm_id_der)

    assert lm_id.tree_size == 4096
    assert lm_id.log_id.hash_algorithm_oid == SHA256_OID

    print(f"  tree_size:              {lm_id.tree_size}")
    print(f"  log_id.hash_alg_oid:    {lm_id.log_id.hash_algorithm_oid}")
    print(f"  log_id.public_key_der:  {len(lm_id.log_id.public_key_der)} bytes  (SPKI)")
    print(f"  repr:                   {repr(lm_id)}")


def main():
    print("=" * 60)
    print("Example 21: Merkle Tree Certificate (MTC) ASN.1 types")
    print("=" * 60)
    demo_proof_node()
    demo_subtree()
    demo_subtree_proof()
    demo_inclusion_proof()
    demo_log_id()
    demo_cosigner_id()
    demo_checkpoint()
    demo_subtree_signature()
    demo_merkle_tree_cert_entry()
    demo_landmark_id()
    print("\nAll MTC examples completed.")


if __name__ == "__main__":
    main()