synta 0.2.6

ASN.1 parser, decoder, and encoder library with DER/BER support and C FFI
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
#!/usr/bin/env python3
"""
Example: Merkle Tree Certificates — builder and validator APIs
(draft-ietf-plants-merkle-tree-certs)

Demonstrates:
  - HashAlgorithm — name(), output_size(), oid()
  - IssuanceLogBuilder — accumulate log entries, compute Merkle tree,
    generate checkpoints and inclusion proofs
  - StandaloneCertificateBuilder — assemble a StandaloneCertificate
    with an embedded SubtreeSignature cosignature
  - StandaloneCertificate — parse the produced DER and inspect fields
  - MtcProof — decode / encode round-trip on TLS wire bytes
  - extract_leaf_index / extract_log_number — serial-number helpers (§6.1)
  - RevokedRanges — add ranges, check is_revoked, verify merging
  - ValidationPolicy + TrustAnchor + CertificateValidator —
    validate_standalone against a trust anchor derived from the
    checkpoint produced by IssuanceLogBuilder

NOTE: Signatures in this example are placeholder bytes used to illustrate
the DER/TLS structure.  The inclusion-proof verification round-trips
correctly; only cryptographic signature verification would fail.

Run with:
    PYTHONPATH=python python3 examples/example_mtc_builders.py
"""

import hashlib
import time

import synta
import synta.mtc as mtc

# ── Pre-encoded DER constants ──────────────────────────────────────────────────
#
# These are hand-built using the synta encoder so they remain self-contained
# and do not require any external library (no cryptography import).

def _enc(fn):
    """Encode one element and return its DER bytes."""
    enc = synta.Encoder(synta.Encoding.DER)
    fn(enc)
    return enc.finish()


def _seq(*parts):
    """Wrap raw TLV bytes in a SEQUENCE (tag 0x30)."""
    enc = synta.Encoder(synta.Encoding.DER)
    enc.encode_sequence(b"".join(parts))
    return enc.finish()


def _explicit(n, content):
    """Wrap bytes in a [n] EXPLICIT context tag (constructed)."""
    enc = synta.Encoder(synta.Encoding.DER)
    enc.encode_explicit_tag(n, "Context", content)
    return enc.finish()


def _oid(s):
    return _enc(lambda e: e.encode_oid(synta.ObjectIdentifier(s)))


def _oct(b):
    return _enc(lambda e: e.encode_octet_string(b))


def _bit(b):
    return _enc(lambda e: e.encode_bit_string(synta.BitString(b, 0)))


def _gt(yr, mo, d, hr, mi, s):
    return _enc(lambda e: e.encode_generalized_time(
        synta.GeneralizedTime(yr, mo, d, hr, mi, s, None)
    ))


def _int(v):
    return _enc(lambda e: e.encode_integer(v))


def _name(cn: str) -> bytes:
    """Build a minimal X.501 Name with a single commonName attribute."""
    atav = _seq(
        _enc(lambda e: e.encode_oid(synta.ObjectIdentifier("2.5.4.3")))
        + _enc(lambda e: e.encode_utf8_string(cn))
    )
    enc = synta.Encoder(synta.Encoding.DER)
    enc.encode_set(atav)        # SET { SEQUENCE { OID, value } }
    return _seq(enc.finish())   # SEQUENCE { SET { … } } — the Name


_NULL = _enc(lambda e: e.encode_null())

# SHA-256 AlgorithmIdentifier (with NULL parameters, per MTC spec)
_SHA256_OID = "2.16.840.1.101.3.4.2.1"
_SHA256_ALG = _seq(_oid(_SHA256_OID), _NULL)

# ecPublicKey / P-256 AlgorithmIdentifier
_EC_OID    = "1.2.840.10045.2.1"
_P256_OID  = "1.2.840.10045.3.1.7"
_EC_ALG    = _seq(_oid(_EC_OID), _oid(_P256_OID))

# RSA AlgorithmIdentifier (for the log-operator SPKI stub)
_RSA_OID   = "1.2.840.113549.1.1.1"
_RSA_ALG   = _seq(_oid(_RSA_OID), _NULL)

# ECDSA-with-SHA-256 AlgorithmIdentifier (no parameters per RFC 5758)
_ECDSA_OID = "1.2.840.10045.4.3.2"
_ECDSA_ALG = _seq(_oid(_ECDSA_OID))

# LogID: SEQUENCE { hashAlgorithm AlgorithmIdentifier, publicKey SPKI }
# The log-operator key is a 32-byte RSA stub; the hash algorithm is SHA-256.
_LOG_SPKI   = _seq(_RSA_ALG, _bit(b"\x01\x02\x03\x04" * 8))
_LOG_ID_DER = _seq(_SHA256_ALG, _LOG_SPKI)

# Placeholder signature bytes (32 bytes, clearly labelled)
_DUMMY_SIG = b"\xca\xfe\xba\xbe" * 8   # 32 bytes

# CosignerID OID (TrustAnchorID = OBJECT IDENTIFIER, spec §4.1)
_COSIGNER_OID = "1.3.6.1.4.1.44363.47.10.1"


# ── Utility helpers ────────────────────────────────────────────────────────────

def section(title: str) -> None:
    print(f"\n{'' * 60}\n{title}\n{'' * 60}")


def _make_ec_spki(domain: str) -> bytes:
    """Return a deterministic stub EC SubjectPublicKeyInfo for *domain*."""
    # The public key value is derived from the domain name via hashing so that
    # different domains produce distinguishable SPKI bytes.
    pk_bytes = b"\x04" + hashlib.sha256(domain.encode()).digest() * 2  # 65 bytes
    return _seq(_EC_ALG, _bit(pk_bytes))


def _make_tbs_log_entry(domain: str) -> bytes:
    """DER-encode a TBSCertificateLogEntry for *domain* (for IssuanceLogBuilder).

    Structure (RFC-defined ASN.1):
        SEQUENCE {
            issuer         Name,
            validity       Validity,
            subject        Name,
            subjectPublicKeyAlgorithm  AlgorithmIdentifier,
            subjectPublicKeyInfoHash   OCTET STRING,
        }

    Note: add_entry() expects the raw TBSCertificateLogEntry SEQUENCE, not the
    MerkleTreeCertEntry [1] EXPLICIT wrapper.
    """
    spki_der  = _make_ec_spki(domain)
    spki_hash = hashlib.sha256(spki_der).digest()   # SHA-256 of SPKI
    validity  = _seq(_gt(2026, 1, 1, 0, 0, 0), _gt(2027, 1, 1, 0, 0, 0))
    return _seq(
        _name("Example CA"),
        validity,
        _name(domain),
        _EC_ALG,
        _oct(spki_hash),
    )


def _make_tbs_cert(serial: int, domain: str) -> bytes:
    """DER-encode a minimal TBSCertificate for *domain* with the given serial.

    Per spec §6.1, the serialNumber must equal the log-entry index so that
    the validator can extract the correct leaf index during verification.
    """
    spki_der = _make_ec_spki(domain)
    validity = _seq(_gt(2026, 1, 1, 0, 0, 0), _gt(2027, 1, 1, 0, 0, 0))
    # version [0] EXPLICIT INTEGER DEFAULT v3
    version  = _explicit(0, _int(2))
    return _seq(
        version,
        _int(serial),        # serialNumber  = leaf_index (spec §6.1)
        _ECDSA_ALG,          # signatureAlgorithm
        _name("Example CA"), # issuer
        validity,
        _name(domain),       # subject
        spki_der,            # subjectPublicKeyInfo
    )


# ── Step 1: HashAlgorithm ─────────────────────────────────────────────────────

def demo_hash_algorithm() -> None:
    section("1 — HashAlgorithm: name, output_size, oid")

    alg = mtc.HashAlgorithm.Sha256
    print(f"  algorithm : {alg!r}")
    print(f"  name()    : {alg.name()}")
    print(f"  output_size() : {alg.output_size()} bytes")
    print(f"  oid()     : {alg.oid()}")
    assert alg.name()        == "SHA-256"
    assert alg.output_size() == 32
    assert alg.oid()         == _SHA256_OID

    # Equality and hashing
    assert alg == mtc.HashAlgorithm.Sha256
    assert alg != mtc.HashAlgorithm.Sha384

    print(f"  Sha384 name: {mtc.HashAlgorithm.Sha384.name()}  "
          f"({mtc.HashAlgorithm.Sha384.output_size()} bytes)")


# ── Step 2: IssuanceLogBuilder ────────────────────────────────────────────────

def demo_issuance_log() -> tuple[bytes, bytes, list[bytes], int]:
    """Build an issuance log, return (checkpoint_der, root, leaf_hashes, tree_size)."""
    section("2 — IssuanceLogBuilder: build log, checkpoint, inclusion proofs")

    # 1. Create the builder and configure it.
    builder = mtc.IssuanceLogBuilder()
    builder.hash_algorithm(mtc.HashAlgorithm.Sha256)
    builder.log_id(_LOG_ID_DER)     # validate-and-store the LogID DER

    print(f"  Initial tree_size (null entry only): {builder.tree_size()}")
    assert builder.tree_size() == 1

    # 2. Add two certificate log entries.  Index 0 is the mandatory null entry
    #    reserved by the spec; the first real entry lands at index 1.
    tbs_entry_alice = _make_tbs_log_entry("alice.example.com")
    tbs_entry_bob   = _make_tbs_log_entry("bob.example.com")

    builder.add_entry(tbs_entry_alice)
    builder.add_entry(tbs_entry_bob)

    print(f"  After adding 2 entries, tree_size: {builder.tree_size()}")
    assert builder.tree_size() == 3   # null + alice + bob

    # 3. Inspect individual leaf hashes (one per entry, including index 0).
    leaf_hashes = builder.compute_leaf_hashes()
    print(f"  compute_leaf_hashes(): {len(leaf_hashes)} hashes, "
          f"each {len(leaf_hashes[0])} bytes")
    assert len(leaf_hashes) == 3
    assert len(leaf_hashes[0]) == 32

    # 4. Compute the Merkle root.
    root = builder.compute_root()
    print(f"  Merkle root: {root.hex()}")
    assert len(root) == 32

    # 5. Generate a Checkpoint at a fixed Unix timestamp so the example is
    #    reproducible.  In production use int(time.time()) instead.
    unix_ts = 1_751_328_000   # 2025-07-01 00:00:00 UTC
    cp_der  = builder.checkpoint_at_unix(unix_ts)
    print(f"  checkpoint_at_unix({unix_ts}): {len(cp_der)} bytes DER")

    # Parse and inspect the checkpoint.
    cp = mtc.Checkpoint.from_der(cp_der)
    print(f"    tree_size      : {cp.tree_size}")
    print(f"    root_value     : {cp.root_value.hex()}")
    print(f"    timestamp      : {cp.timestamp}")
    assert cp.tree_size  == 3
    assert cp.root_value == root

    # 6. Generate inclusion proofs for both real entries.
    proof_alice = builder.generate_proof(1)   # absolute leaf index 1
    proof_bob   = builder.generate_proof(2)   # absolute leaf index 2
    print(f"  generate_proof(1) — alice: {len(proof_alice)} sibling hashes")
    print(f"  generate_proof(2) — bob:   {len(proof_bob)} sibling hashes")

    # 7. Show repr.
    print(f"  repr: {builder!r}")

    return cp_der, root, leaf_hashes, builder.tree_size()


# ── Step 3: MtcProof decode / encode round-trip ───────────────────────────────

def demo_mtc_proof(leaf_hashes: list[bytes]) -> None:
    section("3 — MtcProof: decode / encode round-trip")

    # Hand-craft a minimal MtcProof in TLS wire format (spec §4.3).
    # Layout:
    #   extensions  : u16 length + bytes
    #   start       : uint48 (6 bytes big-endian)
    #   end         : uint48 (6 bytes big-endian)
    #   inclusion_proof : u16 length + bytes
    #   signatures  : u16 count + (per-cosignature records)
    inclusion_bytes = leaf_hashes[0]   # one 32-byte sibling hash
    raw = (
        b"\x00\x00"                    # extensions: length=0
        + b"\x00\x00\x00\x00\x00\x00" # start=0  (uint48 big-endian)
        + b"\x00\x00\x00\x00\x00\x02" # end=2    (uint48 big-endian)
        + len(inclusion_bytes).to_bytes(2, "big") + inclusion_bytes
        + b"\x00\x00"                  # signatures: count=0
    )

    proof = mtc.MtcProof.decode(raw)
    print(f"  MtcProof.decode: {repr(proof)}")
    print(f"    start           : {proof.start}")
    print(f"    end             : {proof.end}")
    print(f"    inclusion_proof : {len(proof.inclusion_proof)} bytes")
    print(f"    signatures      : {len(proof.signatures)} records")
    assert proof.start == 0
    assert proof.end   == 2
    assert len(proof.inclusion_proof) == 32
    assert len(proof.signatures) == 0

    # Re-encode and compare byte-for-byte.
    re_encoded = proof.encode()
    assert re_encoded == raw, "MtcProof encode/decode round-trip failed"
    print(f"  encode() round-trip: OK ({len(re_encoded)} bytes)")


# ── Step 4: extract_leaf_index / extract_log_number ──────────────────────────

def demo_serial_helpers() -> None:
    section("4 — extract_leaf_index / extract_log_number (spec §6.1)")

    # The serialNumber encodes  (log_number << 48) | log_entry_index.
    log_number  = 3
    leaf_index  = 42
    serial      = (log_number << 48) | leaf_index

    extracted_leaf = mtc.extract_leaf_index(serial)
    extracted_log  = mtc.extract_log_number(serial)

    print(f"  serial = (log_number={log_number} << 48) | leaf_index={leaf_index}")
    print(f"  extract_leaf_index({serial}) = {extracted_leaf}")
    print(f"  extract_log_number({serial}) = {extracted_log}")
    assert extracted_leaf == leaf_index
    assert extracted_log  == log_number

    # Edge case: log_number = 0 (the default)
    serial2 = 1
    assert mtc.extract_leaf_index(serial2) == 1
    assert mtc.extract_log_number(serial2) == 0
    print(f"  leaf_index of serial=1:  {mtc.extract_leaf_index(serial2)}")
    print(f"  log_number of serial=1:  {mtc.extract_log_number(serial2)}")


# ── Step 5: RevokedRanges ─────────────────────────────────────────────────────

def demo_revoked_ranges() -> None:
    section("5 — RevokedRanges: add, merge, is_revoked")

    revoked = mtc.RevokedRanges()
    assert revoked.is_empty()

    # Add non-overlapping ranges; they should stay separate.
    revoked.add(10, 20)
    revoked.add(30, 40)
    print(f"  After add(10,20) and add(30,40): {revoked.ranges()}")
    assert len(revoked)              == 2
    assert revoked.is_revoked(15)    is True
    assert revoked.is_revoked(35)    is True
    assert revoked.is_revoked(5)     is False
    assert revoked.is_revoked(25)    is False

    # Adding an overlapping range merges it.
    revoked.add(18, 32)
    print(f"  After add(18,32) — overlapping merge: {revoked.ranges()}")
    assert len(revoked) == 1
    assert revoked.is_revoked(25)  is True
    assert revoked.is_revoked(9)   is False
    assert revoked.is_revoked(41)  is False

    # contains_range: True only when the entire queried range is covered.
    assert revoked.contains_range(10, 40)  is True
    assert revoked.contains_range(5, 40)   is False   # 5..9 not covered

    print(f"  is_revoked(25): {revoked.is_revoked(25)}  (should be True)")
    print(f"  is_revoked(9):  {revoked.is_revoked(9)}   (should be False)")
    print(f"  contains_range(10,40): {revoked.contains_range(10,40)}")
    print(f"  len(revoked): {len(revoked)}  (1 merged range)")
    print(f"  repr: {revoked!r}")


# ── Step 6: StandaloneCertificateBuilder + CertificateValidator ───────────────

def demo_standalone_and_validate(
    cp_der: bytes,
    root: bytes,
    leaf_hashes: list[bytes],
    tree_size: int,
) -> None:
    section("6a — StandaloneCertificateBuilder: assemble for alice.example.com")

    # spec §6.1: serialNumber = (log_number << 48) | log_entry_index
    # Alice is at leaf index 1; log_number defaults to 0.
    alice_serial = (0 << 48) | 1
    tbs_cert_der = _make_tbs_cert(alice_serial, "alice.example.com")

    # Build a SubtreeSignature DER for the full subtree [0, tree_size).
    # In production a CA cosigner would sign this using a real key; here we use
    # a placeholder signature so the structural round-trip can be demonstrated.
    cosigner_der  = _enc(lambda e: e.encode_oid(synta.ObjectIdentifier(_COSIGNER_OID)))
    # The subtree value MUST equal the Merkle root so that inclusion-proof
    # verification can compare against the cosigner-attested hash.
    subtree_der   = _seq(_int(0), _int(tree_size), _oct(root))
    subsig_der    = _seq(cosigner_der, subtree_der, cp_der, _ECDSA_ALG, _bit(_DUMMY_SIG))

    sc_der = (
        mtc.StandaloneCertificateBuilder()
        .tbs_certificate(tbs_cert_der)
        .log_entry_index(1)                     # alice is at index 1
        .tree_leaves(leaf_hashes)               # full leaf-hash list for proof gen
        .hash_algorithm(mtc.HashAlgorithm.Sha256)
        .add_subtree_signature(subsig_der)      # cosignature record
        .signature_algorithm(_ECDSA_ALG)        # outer X.509 envelope sig alg
        .signature(_DUMMY_SIG)                  # outer placeholder signature
        .build()
    )
    print(f"  StandaloneCertificate DER: {len(sc_der)} bytes")

    # Parse the produced DER and inspect its fields.
    sc = mtc.StandaloneCertificate.from_der(sc_der)
    print(f"  tbs_certificate_der: {len(sc.tbs_certificate_der)} bytes")
    print(f"  signature_algorithm_oid: {sc.signature_algorithm_oid}")
    print(f"  signature: {len(sc.signature)} bytes (placeholder)")
    print(f"  subtree_signatures: {len(sc.subtree_signatures)} record(s)")
    print(f"  inclusion_proof: {repr(sc.inclusion_proof)}")
    assert sc.signature_algorithm_oid == _ECDSA_OID
    assert len(sc.subtree_signatures) == 1

    # ── 6b: CertificateValidator ──────────────────────────────────────────────
    section("6b — CertificateValidator: validate alice's StandaloneCertificate")

    # TrustAnchor: constructed from the same LogID that the builder used.
    anchor = mtc.TrustAnchor.from_log_id_der(_LOG_ID_DER, "Example CA Log")
    print(f"  TrustAnchor: {anchor!r}")
    print(f"    name  : {anchor.name}")
    print(f"    active: {anchor.active}")

    # ValidationPolicy.permissive() disables expiration and timestamp checks
    # and requires zero cosignatures — useful for test/illustration scenarios.
    policy = mtc.ValidationPolicy.permissive()
    print(f"  ValidationPolicy.permissive(): {policy!r}")
    print(f"    min_cosignatures: {policy.min_cosignatures}")
    print(f"    check_expiration: {policy.check_expiration}")

    validator = mtc.CertificateValidator(policy)
    validator.add_trust_anchor(anchor)
    print(f"  trust_anchor_count: {validator.trust_anchor_count()}")

    # validate_standalone runs the full pipeline (cosignature quorum,
    # trusted anchor lookup, hash-algorithm policy, leaf-index subtree
    # coverage, revocation check, leaf-hash computation, inclusion proof).
    validator.validate_standalone(sc_der)
    print("  validate_standalone: certificate is VALID")

    # ── Revocation ────────────────────────────────────────────────────────────
    section("6c — Revocation: reject a revoked leaf index via TrustAnchor")

    anchor_revoked = mtc.TrustAnchor.from_log_id_der(_LOG_ID_DER, "Example CA Log (revoked)")
    # Revoke the range containing alice's leaf index (1).
    anchor_revoked.revoke_range(1, 1)
    assert anchor_revoked.is_revoked(1) is True
    print(f"  anchor.is_revoked(1): {anchor_revoked.is_revoked(1)}  (True — alice revoked)")
    print(f"  anchor.is_revoked(2): {anchor_revoked.is_revoked(2)}  (False — bob not revoked)")

    validator_rev = mtc.CertificateValidator(policy)
    validator_rev.add_trust_anchor(anchor_revoked)

    try:
        validator_rev.validate_standalone(sc_der)
        raise AssertionError("expected ValueError — certificate was revoked")
    except ValueError as exc:
        assert "revoked" in str(exc).lower(), f"unexpected error: {exc}"
        print(f"  validate_standalone (revoked anchor) raised ValueError: {exc}")

    print("  Revocation check: OK")


# ── main ──────────────────────────────────────────────────────────────────────

def main() -> None:
    print("=== MTC builder and validator example ===")

    demo_hash_algorithm()

    cp_der, root, leaf_hashes, tree_size = demo_issuance_log()

    demo_mtc_proof(leaf_hashes)

    demo_serial_helpers()

    demo_revoked_ranges()

    demo_standalone_and_validate(cp_der, root, leaf_hashes, tree_size)

    print("\nAll steps completed successfully.")


if __name__ == "__main__":
    main()