trazaeo 0.5.4

Open-source provenance SDK and specification for verifiable EO and climate data workflows
Documentation
use super::util::{from_json, into_py_result, parse_verification_mode, to_json};
use crate::python_facade as facade;
use pyo3::prelude::*;
use pyo3::types::PyModule;
use pyo3::wrap_pyfunction;

fn sign_envelope(
    signer_id: &str,
    signing_key_hex: &str,
    signed_at: &str,
    payload: &[u8],
) -> PyResult<facade::Attestation> {
    into_py_result(facade::make_attestation(
        signer_id,
        signing_key_hex,
        signed_at,
        payload,
    ))
}

trait SignedEnvelope: serde::Serialize {
    fn canonical_attestation_payload_bytes(&self) -> Vec<u8>;
    fn replace_attestation(&mut self, attestation: facade::Attestation);
    fn validate_for_python(&self, operation: &'static str) -> PyResult<()>;
}

impl SignedEnvelope for facade::CaptureEnvelope {
    fn canonical_attestation_payload_bytes(&self) -> Vec<u8> {
        self.canonical_attestation_payload_bytes()
    }

    fn replace_attestation(&mut self, attestation: facade::Attestation) {
        self.key_id = attestation.key_id.clone();
        self.attestations = vec![attestation];
    }

    fn validate_for_python(&self, operation: &'static str) -> PyResult<()> {
        self.validate().map_err(|errs| {
            super::util::py_err(crate::error::TrazaeoError::validation(operation, errs))
        })
    }
}

impl SignedEnvelope for facade::TransformEnvelope {
    fn canonical_attestation_payload_bytes(&self) -> Vec<u8> {
        self.canonical_attestation_payload_bytes()
    }

    fn replace_attestation(&mut self, attestation: facade::Attestation) {
        self.key_id = attestation.key_id.clone();
        self.attestations = vec![attestation];
    }

    fn validate_for_python(&self, operation: &'static str) -> PyResult<()> {
        self.validate().map_err(|errs| {
            super::util::py_err(crate::error::TrazaeoError::validation(operation, errs))
        })
    }
}

impl SignedEnvelope for facade::PublishEnvelope {
    fn canonical_attestation_payload_bytes(&self) -> Vec<u8> {
        self.canonical_attestation_payload_bytes()
    }

    fn replace_attestation(&mut self, attestation: facade::Attestation) {
        self.key_id = attestation.key_id.clone();
        self.attestations = vec![attestation];
    }

    fn validate_for_python(&self, operation: &'static str) -> PyResult<()> {
        self.validate().map_err(|errs| {
            super::util::py_err(crate::error::TrazaeoError::validation(operation, errs))
        })
    }
}

fn prepare_seed_attestation(
    signer_id: &str,
    signing_key_hex: &str,
    signed_at: &str,
) -> PyResult<(String, facade::Attestation)> {
    let seed = sign_envelope(signer_id, signing_key_hex, signed_at, b"")?;
    let key_id = seed.key_id.clone();
    Ok((
        key_id.clone(),
        facade::Attestation {
            signer_id: signer_id.to_string(),
            key_id,
            signature: String::new(),
            signed_at: signed_at.to_string(),
        },
    ))
}

fn finalize_signed_envelope_json<E>(
    serialize_operation: &'static str,
    validate_operation: &'static str,
    mut envelope: E,
    signer_id: &str,
    signing_key_hex: &str,
    signed_at: &str,
) -> PyResult<String>
where
    E: SignedEnvelope,
{
    let payload = envelope.canonical_attestation_payload_bytes();
    let attestation = sign_envelope(signer_id, signing_key_hex, signed_at, &payload)?;
    envelope.replace_attestation(attestation);
    envelope.validate_for_python(validate_operation)?;
    to_json(serialize_operation, &envelope)
}

fn verify_publish_envelope_with_policy_json(
    publish_envelope_json: &str,
    mode: &str,
    trust_policy_json: &str,
) -> PyResult<String> {
    let envelope: facade::PublishEnvelope =
        from_json("parse publish envelope", publish_envelope_json)?;
    let mode = parse_verification_mode(mode)?;
    let trust: facade::TrustPolicy = from_json("parse trust policy", trust_policy_json)?;
    let report = facade::verify_publish_envelope(&envelope, mode, &trust);
    to_json("serialize verification report", &report)
}

#[pyfunction]
pub(crate) fn build_capture_envelope_json(
    session_input_json: &str,
    messages_json: &str,
    signer_id: &str,
    signing_key_hex: &str,
    signed_at: &str,
) -> PyResult<String> {
    let mut input: facade::CaptureSessionInput =
        from_json("parse capture session input", session_input_json)?;
    let messages: Vec<facade::CapturedSegment> =
        from_json("parse captured segments", messages_json)?;
    let (key_id, unsigned_seed_attestation) =
        prepare_seed_attestation(signer_id, signing_key_hex, signed_at)?;
    input.key_id = key_id;
    let envelope = facade::build_capture_envelope(&input, &messages, unsigned_seed_attestation);
    finalize_signed_envelope_json(
        "serialize capture envelope",
        "validate capture envelope",
        envelope,
        signer_id,
        signing_key_hex,
        signed_at,
    )
}

#[pyfunction]
pub(crate) fn build_transform_envelope_json(
    transform_input_json: &str,
    signer_id: &str,
    signing_key_hex: &str,
    signed_at: &str,
) -> PyResult<String> {
    let mut input: facade::TransformStageInput =
        from_json("parse transform input", transform_input_json)?;
    let (key_id, unsigned_seed_attestation) =
        prepare_seed_attestation(signer_id, signing_key_hex, signed_at)?;
    input.key_id = key_id;
    let envelope = facade::build_transform_envelope(&input, unsigned_seed_attestation);
    finalize_signed_envelope_json(
        "serialize transform envelope",
        "validate transform envelope",
        envelope,
        signer_id,
        signing_key_hex,
        signed_at,
    )
}

#[pyfunction]
pub(crate) fn build_publish_envelope_json(
    publish_input_json: &str,
    signer_id: &str,
    signing_key_hex: &str,
    signed_at: &str,
) -> PyResult<String> {
    let mut input: facade::PublishInput = from_json("parse publish input", publish_input_json)?;
    let (key_id, unsigned_seed_attestation) =
        prepare_seed_attestation(signer_id, signing_key_hex, signed_at)?;
    input.key_id = key_id;
    let envelope = facade::build_publish_envelope(&input, unsigned_seed_attestation);
    finalize_signed_envelope_json(
        "serialize publish envelope",
        "validate publish envelope",
        envelope,
        signer_id,
        signing_key_hex,
        signed_at,
    )
}

#[pyfunction]
pub(crate) fn verify_publish_envelope_json(
    publish_envelope_json: &str,
    mode: &str,
    trust_policy_json: &str,
) -> PyResult<String> {
    verify_publish_envelope_with_policy_json(publish_envelope_json, mode, trust_policy_json)
}

pub(crate) fn register_envelope_bindings(m: &Bound<'_, PyModule>) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(build_capture_envelope_json, m)?)?;
    m.add_function(wrap_pyfunction!(build_transform_envelope_json, m)?)?;
    m.add_function(wrap_pyfunction!(build_publish_envelope_json, m)?)?;
    m.add_function(wrap_pyfunction!(verify_publish_envelope_json, m)?)?;
    Ok(())
}