trazaeo 0.5.6

Open-source provenance SDK and specification for verifiable EO and climate data workflows
Documentation
use super::util::{from_json, into_py_result, parse_verification_mode, to_json};
use crate::python_facade as facade;
use pyo3::prelude::*;
use pyo3::types::PyModule;
use pyo3::wrap_pyfunction;
use serde::de::DeserializeOwned;

fn sign_envelope(
    signer_id: &str,
    signing_key_hex: &str,
    signed_at: &str,
    payload: &[u8],
) -> PyResult<facade::Attestation> {
    into_py_result(facade::make_attestation(
        signer_id,
        signing_key_hex,
        signed_at,
        payload,
    ))
}

trait SignedEnvelope: serde::Serialize {
    fn canonical_attestation_payload_bytes(&self) -> Vec<u8>;
    fn replace_attestation(&mut self, attestation: facade::Attestation);
    fn validate_for_python(&self, operation: &'static str) -> PyResult<()>;
}

trait SignedEnvelopeInput: DeserializeOwned {
    fn set_key_id(&mut self, key_id: String);
}

#[derive(Clone, Copy)]
struct EnvelopeJsonOperations {
    parse: &'static str,
    serialize: &'static str,
    validate: &'static str,
}

#[derive(Clone, Copy)]
struct SigningInputs<'a> {
    signer_id: &'a str,
    signing_key_hex: &'a str,
    signed_at: &'a str,
}

impl SignedEnvelopeInput for facade::CaptureSessionInput {
    fn set_key_id(&mut self, key_id: String) {
        self.key_id = key_id;
    }
}

impl SignedEnvelopeInput for facade::TransformStageInput {
    fn set_key_id(&mut self, key_id: String) {
        self.key_id = key_id;
    }
}

impl SignedEnvelopeInput for facade::PublishInput {
    fn set_key_id(&mut self, key_id: String) {
        self.key_id = key_id;
    }
}

impl SignedEnvelope for facade::CaptureEnvelope {
    fn canonical_attestation_payload_bytes(&self) -> Vec<u8> {
        self.canonical_attestation_payload_bytes()
    }

    fn replace_attestation(&mut self, attestation: facade::Attestation) {
        self.key_id = attestation.key_id.clone();
        self.attestations = vec![attestation];
    }

    fn validate_for_python(&self, operation: &'static str) -> PyResult<()> {
        self.validate().map_err(|errs| {
            super::util::py_err(crate::error::TrazaeoError::validation(operation, errs))
        })
    }
}

impl SignedEnvelope for facade::TransformEnvelope {
    fn canonical_attestation_payload_bytes(&self) -> Vec<u8> {
        self.canonical_attestation_payload_bytes()
    }

    fn replace_attestation(&mut self, attestation: facade::Attestation) {
        self.key_id = attestation.key_id.clone();
        self.attestations = vec![attestation];
    }

    fn validate_for_python(&self, operation: &'static str) -> PyResult<()> {
        self.validate().map_err(|errs| {
            super::util::py_err(crate::error::TrazaeoError::validation(operation, errs))
        })
    }
}

impl SignedEnvelope for facade::PublishEnvelope {
    fn canonical_attestation_payload_bytes(&self) -> Vec<u8> {
        self.canonical_attestation_payload_bytes()
    }

    fn replace_attestation(&mut self, attestation: facade::Attestation) {
        self.key_id = attestation.key_id.clone();
        self.attestations = vec![attestation];
    }

    fn validate_for_python(&self, operation: &'static str) -> PyResult<()> {
        self.validate().map_err(|errs| {
            super::util::py_err(crate::error::TrazaeoError::validation(operation, errs))
        })
    }
}

fn prepare_seed_attestation(
    signer_id: &str,
    signing_key_hex: &str,
    signed_at: &str,
) -> PyResult<(String, facade::Attestation)> {
    let seed = sign_envelope(signer_id, signing_key_hex, signed_at, b"")?;
    let key_id = seed.key_id.clone();
    Ok((
        key_id.clone(),
        facade::Attestation {
            signer_id: signer_id.to_string(),
            key_id,
            signature: String::new(),
            signed_at: signed_at.to_string(),
        },
    ))
}

#[pyfunction]
pub(crate) fn key_id_from_signing_key_hex(signing_key_hex: &str) -> PyResult<String> {
    let attestation = sign_envelope(
        "key-id-derivation",
        signing_key_hex,
        "1970-01-01T00:00:00Z",
        b"",
    )?;
    Ok(attestation.key_id)
}

fn finalize_signed_envelope_json<E>(
    serialize_operation: &'static str,
    validate_operation: &'static str,
    mut envelope: E,
    signer_id: &str,
    signing_key_hex: &str,
    signed_at: &str,
) -> PyResult<String>
where
    E: SignedEnvelope,
{
    let payload = envelope.canonical_attestation_payload_bytes();
    let attestation = sign_envelope(signer_id, signing_key_hex, signed_at, &payload)?;
    envelope.replace_attestation(attestation);
    envelope.validate_for_python(validate_operation)?;
    to_json(serialize_operation, &envelope)
}

fn build_signed_envelope_json<I, E>(
    operations: EnvelopeJsonOperations,
    input_json: &str,
    signing: SigningInputs<'_>,
    builder: impl FnOnce(I, facade::Attestation) -> E,
) -> PyResult<String>
where
    I: SignedEnvelopeInput,
    E: SignedEnvelope,
{
    let mut input: I = from_json(operations.parse, input_json)?;
    let (key_id, unsigned_seed_attestation) = prepare_seed_attestation(
        signing.signer_id,
        signing.signing_key_hex,
        signing.signed_at,
    )?;
    input.set_key_id(key_id);
    let envelope = builder(input, unsigned_seed_attestation);
    finalize_signed_envelope_json(
        operations.serialize,
        operations.validate,
        envelope,
        signing.signer_id,
        signing.signing_key_hex,
        signing.signed_at,
    )
}

fn verify_publish_envelope_with_policy_json(
    publish_envelope_json: &str,
    mode: &str,
    trust_policy_json: &str,
) -> PyResult<String> {
    let envelope: facade::PublishEnvelope =
        from_json("parse publish envelope", publish_envelope_json)?;
    let mode = parse_verification_mode(mode)?;
    let trust: facade::TrustPolicy = from_json("parse trust policy", trust_policy_json)?;
    let report = facade::verify_publish_envelope(&envelope, mode, &trust);
    to_json("serialize verification report", &report)
}

#[pyfunction]
pub(crate) fn build_capture_envelope_json(
    session_input_json: &str,
    messages_json: &str,
    signer_id: &str,
    signing_key_hex: &str,
    signed_at: &str,
) -> PyResult<String> {
    let messages: Vec<facade::CapturedSegment> =
        from_json("parse captured segments", messages_json)?;
    build_signed_envelope_json(
        EnvelopeJsonOperations {
            parse: "parse capture session input",
            serialize: "serialize capture envelope",
            validate: "validate capture envelope",
        },
        session_input_json,
        SigningInputs {
            signer_id,
            signing_key_hex,
            signed_at,
        },
        |input, unsigned_seed_attestation| {
            facade::build_capture_envelope(&input, &messages, unsigned_seed_attestation)
        },
    )
}

#[pyfunction]
pub(crate) fn build_transform_envelope_json(
    transform_input_json: &str,
    signer_id: &str,
    signing_key_hex: &str,
    signed_at: &str,
) -> PyResult<String> {
    build_signed_envelope_json(
        EnvelopeJsonOperations {
            parse: "parse transform input",
            serialize: "serialize transform envelope",
            validate: "validate transform envelope",
        },
        transform_input_json,
        SigningInputs {
            signer_id,
            signing_key_hex,
            signed_at,
        },
        |input, unsigned_seed_attestation| {
            facade::build_transform_envelope(&input, unsigned_seed_attestation)
        },
    )
}

#[pyfunction]
pub(crate) fn build_publish_envelope_json(
    publish_input_json: &str,
    signer_id: &str,
    signing_key_hex: &str,
    signed_at: &str,
) -> PyResult<String> {
    build_signed_envelope_json(
        EnvelopeJsonOperations {
            parse: "parse publish input",
            serialize: "serialize publish envelope",
            validate: "validate publish envelope",
        },
        publish_input_json,
        SigningInputs {
            signer_id,
            signing_key_hex,
            signed_at,
        },
        |input, unsigned_seed_attestation| {
            facade::build_publish_envelope(&input, unsigned_seed_attestation)
        },
    )
}

#[pyfunction]
pub(crate) fn verify_publish_envelope_json(
    publish_envelope_json: &str,
    mode: &str,
    trust_policy_json: &str,
) -> PyResult<String> {
    verify_publish_envelope_with_policy_json(publish_envelope_json, mode, trust_policy_json)
}

pub(crate) fn register_envelope_bindings(m: &Bound<'_, PyModule>) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(key_id_from_signing_key_hex, m)?)?;
    m.add_function(wrap_pyfunction!(build_capture_envelope_json, m)?)?;
    m.add_function(wrap_pyfunction!(build_transform_envelope_json, m)?)?;
    m.add_function(wrap_pyfunction!(build_publish_envelope_json, m)?)?;
    m.add_function(wrap_pyfunction!(verify_publish_envelope_json, m)?)?;
    Ok(())
}