bjorn-acme 0.3.0

Building blocks for an ACME server
Documentation
import traceback
import typing

from . import models
from django.conf import settings
from django.utils import timezone
import datetime
import cryptography.x509
import cryptography.x509.oid
import cryptography.x509.certificate_transparency
import cryptography.hazmat.primitives.serialization
import cryptography.hazmat.primitives.hashes
import requests
import base64
import uuid


INTERMEDIATE_CA = models.IssuingCert.objects.get(id="3cdc5e60-5cb2-4248-8265-813128fc786c")
with open(settings.BASE_DIR / "ca-certs" / "intermediate-key.pem", "rb") as intermediate:
    INTERMEDIATE_CA_KEY = cryptography.hazmat.primitives.serialization.load_pem_private_key(intermediate.read(), None)


def build_cert(
        issuer_cert: models.IssuingCert,
        csr: cryptography.x509.CertificateSigningRequest, now, order: models.Order,
        builder: cryptography.x509.CertificateBuilder, cert_id
):
    issuer_cert_obj = INTERMEDIATE_CA.cert_obj()
    dns_labels = [i.identifier for i in order.identifiers.all() if i.id_type == models.ID_DNS]

    builder = builder.public_key(csr.public_key())
    builder = builder.serial_number(cert_id.int)
    builder = builder.not_valid_before(now)
    builder = builder.not_valid_after(now + datetime.timedelta(days=30))
    builder = builder.issuer_name(issuer_cert_obj.subject)
    builder = builder.subject_name(cryptography.x509.Name([
        cryptography.x509.NameAttribute(cryptography.x509.NameOID.COMMON_NAME, dns_labels[0]),
    ]))
    builder = builder.add_extension(
        cryptography.x509.SubjectAlternativeName(
            [cryptography.x509.DNSName(i) for i in dns_labels]
        ), critical=False
    )
    builder = builder.add_extension(
        cryptography.x509.BasicConstraints(ca=False, path_length=None), critical=True
    )
    builder = builder.add_extension(
        cryptography.x509.SubjectKeyIdentifier.from_public_key(csr.public_key()), critical=False
    )
    builder = builder.add_extension(
        cryptography.x509.AuthorityKeyIdentifier.from_issuer_public_key(issuer_cert_obj.public_key()),
        critical=False
    )
    builder = builder.add_extension(
        cryptography.x509.KeyUsage(
            digital_signature=True, key_encipherment=True, content_commitment=False, data_encipherment=False,
            key_agreement=False, key_cert_sign=False, encipher_only=False, decipher_only=False, crl_sign=False,
        ), critical=True
    )
    builder = builder.add_extension(
        cryptography.x509.ExtendedKeyUsage(usages=[
            cryptography.x509.ExtendedKeyUsageOID.SERVER_AUTH,
            cryptography.x509.ExtendedKeyUsageOID.CLIENT_AUTH,
        ]), critical=False
    )

    if issuer_cert.crl_url:
        builder = builder.add_extension(
            cryptography.x509.CRLDistributionPoints([
                cryptography.x509.DistributionPoint(
                    full_name=[
                        cryptography.x509.UniformResourceIdentifier(issuer_cert.crl_url)
                    ],
                    relative_name=None, crl_issuer=None, reasons=None
                )
            ]), critical=False
        )

    access_descriptions = []
    if issuer_cert.cert_url:
        access_descriptions.append(cryptography.x509.AccessDescription(
            access_method=cryptography.x509.oid.AuthorityInformationAccessOID.CA_ISSUERS,
            access_location=cryptography.x509.UniformResourceIdentifier(issuer_cert.cert_url)
        ))
    if issuer_cert.ocsp_responder_url:
        access_descriptions.append(cryptography.x509.AccessDescription(
            access_method=cryptography.x509.oid.AuthorityInformationAccessOID.OCSP,
            access_location=cryptography.x509.UniformResourceIdentifier(issuer_cert.ocsp_responder_url)
        ))
    if len(access_descriptions):
        builder = builder.add_extension(cryptography.x509.AuthorityInformationAccess(access_descriptions))

    return builder


class SCT:
    def __init__(self, version: int, log_id: str, timestamp: int, extensions: str, signature: str):
        self._version = version
        self._log_id = base64.b64decode(log_id)
        self._timestamp = timestamp
        self._extensions = base64.b64decode(extensions)
        self._signature = base64.b64decode(signature)

    def encoded(self):
        out = bytearray()
        out.extend(self._version.to_bytes(1, byteorder="big"))
        out.extend(self._log_id[:32])
        out.extend(self._timestamp.to_bytes(8, byteorder="big"))
        out.extend(len(self._extensions).to_bytes(2, byteorder="big"))
        out.extend(self._extensions)
        out.extend(self._signature)
        return bytes(out)


class SCTList:
    def __init__(self, scts: typing.List[SCT]):
        self._scts = scts

    def encoded(self):
        sct_list = bytearray()
        for sct in self._scts:
            sct_encoded = sct.encoded()
            sct_list.extend(len(sct_encoded).to_bytes(2, byteorder="big"))
            sct_list.extend(sct_encoded)

        out = bytearray()
        out.extend(len(sct_list).to_bytes(2, byteorder="big"))
        out.extend(sct_list)

        return bytes(out)

    def encoded_asn1(self):
        out = bytearray([0x04])

        encoded = self.encoded()
        length = len(encoded)

        if length < 128:
            out.append(length)
        else:
            values = []
            while length:
                values.append(length & 0xff)
                length >>= 8
            values.reverse()
            out.append(0x80 | len(values))
            out.extend(values)

        out.extend(encoded)

        return bytes(out)


def sign_order(order: models.Order):
    try:
        chain_bytes = []
        issed_by = INTERMEDIATE_CA
        while issed_by:
            chain_bytes.append(base64.b64encode(issed_by.cert).decode())
            issed_by = issed_by.issued_by

        cert_id = uuid.uuid4()
        now = timezone.now()
        csr = cryptography.x509.load_der_x509_csr(order.csr)

        precert_builder = cryptography.x509.CertificateBuilder()
        precert_builder = build_cert(INTERMEDIATE_CA, csr, now, order, precert_builder, cert_id)
        precert_builder = precert_builder.add_extension(cryptography.x509.PrecertPoison(), critical=True)
        precert = precert_builder.sign(INTERMEDIATE_CA_KEY, cryptography.hazmat.primitives.hashes.SHA512())
        precert_bytes = precert.public_bytes(cryptography.hazmat.primitives.serialization.Encoding.DER)

        expiry_date = precert.not_valid_after
        ct_logs = list(map(
            lambda log: log["url"],
            filter(
                lambda log: log["expiry_range"] is None or (
                        log["expiry_range"]["start"] <= expiry_date < log["expiry_range"]["end"]
                ),
                settings.CT_LOGS
            )
        ))

        scts = []
        for ct in ct_logs:
            r = requests.post(f"{ct}/ct/v1/add-pre-chain", json={
                "chain": [base64.b64encode(precert_bytes).decode()] + chain_bytes
            })
            try:
                r.raise_for_status()
            except requests.exceptions.RequestException as e:
                print(f"Failed to submit to {ct}: {r.text}")
                raise e
            sct = r.json()
            scts.append(SCT(
                version=sct["sct_version"],
                log_id=sct["id"],
                timestamp=sct["timestamp"],
                extensions=sct["extensions"],
                signature=sct["signature"],
            ))

        eecert_builder = cryptography.x509.CertificateBuilder()
        eecert_builder = build_cert(INTERMEDIATE_CA, csr, now, order, eecert_builder, cert_id)

        sct_list = SCTList(scts)
        eecert_builder = eecert_builder.add_extension(
            cryptography.x509.UnrecognizedExtension(
                oid=cryptography.x509.ExtensionOID.PRECERT_SIGNED_CERTIFICATE_TIMESTAMPS,
                value=sct_list.encoded_asn1()
            ), critical=False
        )

        eecert = eecert_builder.sign(INTERMEDIATE_CA_KEY, cryptography.hazmat.primitives.hashes.SHA512())
        eecert_bytes = eecert.public_bytes(cryptography.hazmat.primitives.serialization.Encoding.DER)

        for ct in ct_logs:
            r = requests.post(f"{ct}/ct/v1/add-chain", json={
                "chain": [base64.b64encode(eecert_bytes).decode()] + chain_bytes
            })
            r.raise_for_status()

        certificate = models.Certificate(
            id=cert_id,
            issued_at=now,
            ee_cert=eecert_bytes,
            issued_by=INTERMEDIATE_CA
        )
        certificate.save()
        order.certificate = certificate
        order.save()
    except Exception as e:
        traceback.print_exception(e)